You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/29 17:46:17 UTC
samza git commit: SAMZA-1976: MetadataStore API cleanup.
Repository: samza
Updated Branches:
refs/heads/master ff607cb6b -> b90ab50c3
SAMZA-1976: MetadataStore API cleanup.
This PR consists of the following changes:
* Switching all the API methods from using byte[] array as key type to string.
* Fixed `CoordinatorMetadataStore`, `ZkMetadataStore` tests due to the type change of key.
Shortly in a followup PR, namespace unification for different metadata stored in standalone and YARN model will be done.
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Reviewers: Prateek <pr...@linkedin.com>
Closes #791 from shanthoosh/metadata_store_api_cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b90ab50c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b90ab50c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b90ab50c
Branch: refs/heads/master
Commit: b90ab50c32d97a31313efdfb26b48c2fcdeb61da
Parents: ff607cb
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Thu Nov 29 09:46:13 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Nov 29 09:46:13 2018 -0800
----------------------------------------------------------------------
.../samza/metadatastore/MetadataStore.java | 8 +++---
.../apache/samza/container/LocalityManager.java | 6 ++---
.../grouper/task/TaskAssignmentManager.java | 9 +++----
.../metadatastore/CoordinatorStreamStore.java | 27 ++++++++++----------
.../org/apache/samza/zk/ZkMetadataStore.java | 17 ++++++------
.../TestCoordinatorStreamStore.java | 23 ++++++-----------
.../apache/samza/zk/TestZkMetadataStore.java | 18 ++++++-------
7 files changed, 49 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
index cd04794..9009a65 100644
--- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
@@ -39,7 +39,7 @@ public interface MetadataStore {
* @param key the key with which the associated value is to be fetched.
* @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
*/
- byte[] get(byte[] key);
+ byte[] get(String key);
/**
* Updates the mapping of the specified key-value pair.
@@ -47,21 +47,21 @@ public interface MetadataStore {
* @param key the key with which the specified {@code value} is to be associated.
* @param value the value with which the specified {@code key} is to be associated.
*/
- void put(byte[] key, byte[] value);
+ void put(String key, byte[] value);
/**
* Deletes the mapping for the specified {@code key} from this metadata store (if such mapping exists).
*
* @param key the key for which the mapping is to be deleted.
*/
- void delete(byte[] key);
+ void delete(String key);
/**
* Returns all the entries in this metadata store.
*
* @return all entries in this metadata store.
*/
- Map<byte[], byte[]> all();
+ Map<String, byte[]> all();
/**
* Flushes the metadata store, if applicable.
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 63483b7..fe076ee 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -89,10 +89,10 @@ public class LocalityManager {
*/
public Map<String, Map<String, String>> readContainerLocality() {
Map<String, Map<String, String>> allMappings = new HashMap<>();
- metadataStore.all().forEach((keyBytes, valueBytes) -> {
+ metadataStore.all().forEach((containerId, valueBytes) -> {
if (valueBytes != null) {
String locationId = valueSerde.fromBytes(valueBytes);
- allMappings.put(keySerde.fromBytes(keyBytes), ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
+ allMappings.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
}
});
if (LOG.isDebugEnabled()) {
@@ -120,7 +120,7 @@ public class LocalityManager {
LOG.info("Container {} started at {}", containerId, hostName);
}
- metadataStore.put(keySerde.toBytes(containerId), valueSerde.toBytes(hostName));
+ metadataStore.put(containerId, valueSerde.toBytes(hostName));
}
public void close() {
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 0ada91c..32bbf29 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -94,8 +94,7 @@ public class TaskAssignmentManager {
*/
public Map<String, String> readTaskAssignment() {
taskNameToContainerId.clear();
- metadataStore.all().forEach((keyBytes, valueBytes) -> {
- String taskName = keySerde.fromBytes(keyBytes);
+ metadataStore.all().forEach((taskName, valueBytes) -> {
String containerId = valueSerde.fromBytes(valueBytes);
if (containerId != null) {
taskNameToContainerId.put(taskName, containerId);
@@ -120,10 +119,10 @@ public class TaskAssignmentManager {
}
if (containerId == null) {
- metadataStore.delete(keySerde.toBytes(taskName));
+ metadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
} else {
- metadataStore.put(keySerde.toBytes(taskName), valueSerde.toBytes(containerId));
+ metadataStore.put(taskName, valueSerde.toBytes(containerId));
taskNameToContainerId.put(taskName, containerId);
}
}
@@ -135,7 +134,7 @@ public class TaskAssignmentManager {
*/
public void deleteTaskContainerMappings(Iterable<String> taskNames) {
for (String taskName : taskNames) {
- metadataStore.delete(keySerde.toBytes(taskName));
+ metadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index af5e2f9..899c87c 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -20,16 +20,15 @@ package org.apache.samza.coordinator.metadatastore;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
-import com.google.common.primitives.UnsignedBytes;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
-import java.util.TreeMap;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
@@ -67,10 +66,9 @@ public class CoordinatorStreamStore implements MetadataStore {
private final SystemConsumer systemConsumer;
private final SystemAdmin systemAdmin;
private final String type;
- private final Serde<List<?>> keySerde;
+ private final CoordinatorStreamKeySerde keySerde;
- // Using custom comparator since java default comparator offers object identity equality(not value equality) for byte arrays.
- private final Map<byte[], byte[]> bootstrappedMessages = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+ private final Map<String, byte[]> bootstrappedMessages = new HashMap<>();
private final Object bootstrapLock = new Object();
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
@@ -79,7 +77,7 @@ public class CoordinatorStreamStore implements MetadataStore {
public CoordinatorStreamStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
this.config = config;
this.type = namespace;
- this.keySerde = new JsonSerde<>();
+ this.keySerde = new CoordinatorStreamKeySerde(type);
this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
@@ -104,26 +102,26 @@ public class CoordinatorStreamStore implements MetadataStore {
}
@Override
- public byte[] get(byte[] key) {
+ public byte[] get(String key) {
bootstrapMessagesFromStream();
return bootstrappedMessages.get(key);
}
@Override
- public void put(byte[] key, byte[] value) {
- OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, key, value);
+ public void put(String key, byte[] value) {
+ OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, keySerde.toBytes(key), value);
systemProducer.send(SOURCE, envelope);
flush();
}
@Override
- public void delete(byte[] key) {
+ public void delete(String key) {
// Since kafka doesn't support individual message deletion, store value as null for a key to delete.
put(key, null);
}
@Override
- public Map<byte[], byte[]> all() {
+ public Map<String, byte[]> all() {
bootstrapMessagesFromStream();
return Collections.unmodifiableMap(bootstrappedMessages);
}
@@ -136,13 +134,14 @@ public class CoordinatorStreamStore implements MetadataStore {
while (iterator.hasNext()) {
IncomingMessageEnvelope envelope = iterator.next();
byte[] keyAsBytes = (byte[]) envelope.getKey();
- Object[] keyArray = keySerde.fromBytes(keyAsBytes).toArray();
+ Serde<List<?>> serde = new JsonSerde<>();
+ Object[] keyArray = serde.fromBytes(keyAsBytes).toArray();
CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, new HashMap<>());
if (Objects.equals(coordinatorStreamMessage.getType(), type)) {
if (envelope.getMessage() != null) {
- bootstrappedMessages.put(keyAsBytes, (byte[]) envelope.getMessage());
+ bootstrappedMessages.put(coordinatorStreamMessage.getKey(), (byte[]) envelope.getMessage());
} else {
- bootstrappedMessages.remove(keyAsBytes);
+ bootstrappedMessages.remove(coordinatorStreamMessage.getKey());
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
index 4cfdc8d..fa41df6 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -18,7 +18,6 @@
*/
package org.apache.samza.zk;
-import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.List;
@@ -64,7 +63,7 @@ public class ZkMetadataStore implements MetadataStore {
* {@inheritDoc}
*/
@Override
- public byte[] get(byte[] key) {
+ public byte[] get(String key) {
return zkClient.readData(getZkPathForKey(key), true);
}
@@ -72,7 +71,7 @@ public class ZkMetadataStore implements MetadataStore {
* {@inheritDoc}
*/
@Override
- public void put(byte[] key, byte[] value) {
+ public void put(String key, byte[] value) {
String zkPath = getZkPathForKey(key);
zkClient.createPersistent(zkPath, true);
zkClient.writeData(zkPath, value);
@@ -82,7 +81,7 @@ public class ZkMetadataStore implements MetadataStore {
* {@inheritDoc}
*/
@Override
- public void delete(byte[] key) {
+ public void delete(String key) {
zkClient.delete(getZkPathForKey(key));
}
@@ -91,15 +90,15 @@ public class ZkMetadataStore implements MetadataStore {
* @throws SamzaException if there're exceptions reading data from zookeeper.
*/
@Override
- public Map<byte[], byte[]> all() {
+ public Map<String, byte[]> all() {
try {
List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
- Map<byte[], byte[]> result = new HashMap<>();
+ Map<String, byte[]> result = new HashMap<>();
for (String zkSubDir : zkSubDirectories) {
String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir);
byte[] value = zkClient.readData(completeZkPath, true);
if (value != null) {
- result.put(completeZkPath.getBytes("UTF-8"), value);
+ result.put(zkSubDir, value);
}
}
return result;
@@ -126,7 +125,7 @@ public class ZkMetadataStore implements MetadataStore {
zkClient.close();
}
- private String getZkPathForKey(byte[] key) {
- return String.format("%s/%s", zkBaseDir, new String(key, Charset.forName("UTF-8")));
+ private String getZkPathForKey(String key) {
+ return String.format("%s/%s", zkBaseDir, key);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index 0e48363..2f6f598 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -62,7 +62,7 @@ public class TestCoordinatorStreamStore {
@Test
public void testReadAfterWrite() {
- byte[] key = getKey("test-key1");
+ String key = "test-key1";
byte[] value = getValue("test-value1");
Assert.assertNull(coordinatorStreamStore.get(key));
coordinatorStreamStore.put(key, value);
@@ -72,7 +72,7 @@ public class TestCoordinatorStreamStore {
@Test
public void testReadAfterDelete() {
- byte[] key = getKey("test-key1");
+ String key = "test-key1";
byte[] value = getValue("test-value1");
Assert.assertNull(coordinatorStreamStore.get(key));
coordinatorStreamStore.put(key, value);
@@ -84,13 +84,13 @@ public class TestCoordinatorStreamStore {
@Test
public void testReadOfNonExistentKey() {
- Assert.assertNull(coordinatorStreamStore.get("randomKey".getBytes()));
+ Assert.assertNull(coordinatorStreamStore.get("randomKey"));
Assert.assertEquals(0, coordinatorStreamStore.all().size());
}
@Test
public void testMultipleUpdatesForSameKey() {
- byte[] key = getKey("test-key1");
+ String key = "test-key1";
byte[] value = getValue("test-value1");
byte[] value1 = getValue("test-value2");
coordinatorStreamStore.put(key, value);
@@ -101,16 +101,16 @@ public class TestCoordinatorStreamStore {
@Test
public void testAllEntries() {
- byte[] key = getKey("test-key1");
- byte[] key1 = getKey("test-key2");
- byte[] key2 = getKey("test-key3");
+ String key = "test-key1";
+ String key1 = "test-key2";
+ String key2 = "test-key3";
byte[] value = getValue("test-value1");
byte[] value1 = getValue("test-value2");
byte[] value2 = getValue("test-value3");
coordinatorStreamStore.put(key, value);
coordinatorStreamStore.put(key1, value1);
coordinatorStreamStore.put(key2, value2);
- ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+ ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
Assert.assertEquals(expected, coordinatorStreamStore.all());
}
@@ -119,11 +119,4 @@ public class TestCoordinatorStreamStore {
SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", "testTask", value);
return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
}
-
- private byte[] getKey(String key) {
- JsonSerde<List<?>> keySerde = new JsonSerde<>();
- SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", key, "");
- return keySerde.toBytes(Arrays.asList(setTaskContainerMapping.getKeyArray()));
- }
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 5930c65..3d5f3b3 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -67,7 +67,7 @@ public class TestZkMetadataStore {
@Test
public void testReadAfterWrite() throws Exception {
- byte[] key = "test-key1".getBytes("UTF-8");
+ String key = "test-key1";
byte[] value = "test-value1".getBytes("UTF-8");
Assert.assertNull(zkMetadataStore.get(key));
zkMetadataStore.put(key, value);
@@ -77,7 +77,7 @@ public class TestZkMetadataStore {
@Test
public void testReadAfterDelete() throws Exception {
- byte[] key = "test-key1".getBytes("UTF-8");
+ String key = "test-key1";
byte[] value = "test-value1".getBytes("UTF-8");
Assert.assertNull(zkMetadataStore.get(key));
zkMetadataStore.put(key, value);
@@ -88,14 +88,14 @@ public class TestZkMetadataStore {
}
@Test
- public void testReadOfNonExistentKey() throws Exception {
- Assert.assertNull(zkMetadataStore.get("randomKey".getBytes("UTF-8")));
+ public void testReadOfNonExistentKey() {
+ Assert.assertNull(zkMetadataStore.get("randomKey"));
Assert.assertEquals(0, zkMetadataStore.all().size());
}
@Test
public void testMultipleUpdatesForSameKey() throws Exception {
- byte[] key = "test-key1".getBytes("UTF-8");
+ String key = "test-key1";
byte[] value = "test-value1".getBytes("UTF-8");
byte[] value1 = "test-value2".getBytes("UTF-8");
zkMetadataStore.put(key, value);
@@ -106,16 +106,16 @@ public class TestZkMetadataStore {
@Test
public void testAllEntries() throws Exception {
- byte[] key = "test-key1".getBytes("UTF-8");
- byte[] key1 = "test-key2".getBytes("UTF-8");
- byte[] key2 = "test-key3".getBytes("UTF-8");
+ String key = "test-key1";
+ String key1 = "test-key2";
+ String key2 = "test-key3";
byte[] value = "test-value1".getBytes("UTF-8");
byte[] value1 = "test-value2".getBytes("UTF-8");
byte[] value2 = "test-value3".getBytes("UTF-8");
zkMetadataStore.put(key, value);
zkMetadataStore.put(key1, value1);
zkMetadataStore.put(key2, value2);
- ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+ ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
Assert.assertEquals(expected.size(), zkMetadataStore.all().size());
}
}