You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/29 17:46:17 UTC

samza git commit: SAMZA-1976: MetadataStore API cleanup.

Repository: samza
Updated Branches:
  refs/heads/master ff607cb6b -> b90ab50c3


SAMZA-1976: MetadataStore API cleanup.

This PR consists of the following changes:
* Switching all the API methods from using byte[] array as key type to string.
* Fixed `CoordinatorMetadataStore`, `ZkMetadataStore` tests due to the type change of key.

Shortly in a followup PR,  namespace unification for different metadata stored in standalone and YARN model will be done.

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Prateek <pr...@linkedin.com>

Closes #791 from shanthoosh/metadata_store_api_cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b90ab50c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b90ab50c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b90ab50c

Branch: refs/heads/master
Commit: b90ab50c32d97a31313efdfb26b48c2fcdeb61da
Parents: ff607cb
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Thu Nov 29 09:46:13 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Nov 29 09:46:13 2018 -0800

----------------------------------------------------------------------
 .../samza/metadatastore/MetadataStore.java      |  8 +++---
 .../apache/samza/container/LocalityManager.java |  6 ++---
 .../grouper/task/TaskAssignmentManager.java     |  9 +++----
 .../metadatastore/CoordinatorStreamStore.java   | 27 ++++++++++----------
 .../org/apache/samza/zk/ZkMetadataStore.java    | 17 ++++++------
 .../TestCoordinatorStreamStore.java             | 23 ++++++-----------
 .../apache/samza/zk/TestZkMetadataStore.java    | 18 ++++++-------
 7 files changed, 49 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
index cd04794..9009a65 100644
--- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
@@ -39,7 +39,7 @@ public interface MetadataStore {
    * @param key the key with which the associated value is to be fetched.
    * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
    */
-  byte[] get(byte[] key);
+  byte[] get(String key);
 
   /**
    * Updates the mapping of the specified key-value pair.
@@ -47,21 +47,21 @@ public interface MetadataStore {
    * @param key the key with which the specified {@code value} is to be associated.
    * @param value the value with which the specified {@code key} is to be associated.
    */
-  void put(byte[] key, byte[] value);
+  void put(String key, byte[] value);
 
   /**
    * Deletes the mapping for the specified {@code key} from this metadata store (if such mapping exists).
    *
    * @param key the key for which the mapping is to be deleted.
    */
-  void delete(byte[] key);
+  void delete(String key);
 
   /**
    * Returns all the entries in this metadata store.
    *
    * @return all entries in this metadata store.
    */
-  Map<byte[], byte[]> all();
+  Map<String, byte[]> all();
 
   /**
    * Flushes the metadata store, if applicable.

http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 63483b7..fe076ee 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -89,10 +89,10 @@ public class LocalityManager {
    */
   public Map<String, Map<String, String>> readContainerLocality() {
     Map<String, Map<String, String>> allMappings = new HashMap<>();
-    metadataStore.all().forEach((keyBytes, valueBytes) -> {
+    metadataStore.all().forEach((containerId, valueBytes) -> {
         if (valueBytes != null) {
           String locationId = valueSerde.fromBytes(valueBytes);
-          allMappings.put(keySerde.fromBytes(keyBytes), ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
+          allMappings.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
         }
       });
     if (LOG.isDebugEnabled()) {
@@ -120,7 +120,7 @@ public class LocalityManager {
       LOG.info("Container {} started at {}", containerId, hostName);
     }
 
-    metadataStore.put(keySerde.toBytes(containerId), valueSerde.toBytes(hostName));
+    metadataStore.put(containerId, valueSerde.toBytes(hostName));
   }
 
   public void close() {

http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 0ada91c..32bbf29 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -94,8 +94,7 @@ public class TaskAssignmentManager {
    */
   public Map<String, String> readTaskAssignment() {
     taskNameToContainerId.clear();
-    metadataStore.all().forEach((keyBytes, valueBytes) -> {
-        String taskName = keySerde.fromBytes(keyBytes);
+    metadataStore.all().forEach((taskName, valueBytes) -> {
         String containerId = valueSerde.fromBytes(valueBytes);
         if (containerId != null) {
           taskNameToContainerId.put(taskName, containerId);
@@ -120,10 +119,10 @@ public class TaskAssignmentManager {
     }
 
     if (containerId == null) {
-      metadataStore.delete(keySerde.toBytes(taskName));
+      metadataStore.delete(taskName);
       taskNameToContainerId.remove(taskName);
     } else {
-      metadataStore.put(keySerde.toBytes(taskName), valueSerde.toBytes(containerId));
+      metadataStore.put(taskName, valueSerde.toBytes(containerId));
       taskNameToContainerId.put(taskName, containerId);
     }
   }
@@ -135,7 +134,7 @@ public class TaskAssignmentManager {
    */
   public void deleteTaskContainerMappings(Iterable<String> taskNames) {
     for (String taskName : taskNames) {
-      metadataStore.delete(keySerde.toBytes(taskName));
+      metadataStore.delete(taskName);
       taskNameToContainerId.remove(taskName);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index af5e2f9..899c87c 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -20,16 +20,15 @@ package org.apache.samza.coordinator.metadatastore;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
-import com.google.common.primitives.UnsignedBytes;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
-import java.util.TreeMap;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -67,10 +66,9 @@ public class CoordinatorStreamStore implements MetadataStore {
   private final SystemConsumer systemConsumer;
   private final SystemAdmin systemAdmin;
   private final String type;
-  private final Serde<List<?>> keySerde;
+  private final CoordinatorStreamKeySerde keySerde;
 
-  // Using custom comparator since java default comparator offers object identity equality(not value equality) for byte arrays.
-  private final Map<byte[], byte[]> bootstrappedMessages = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+  private final Map<String, byte[]> bootstrappedMessages = new HashMap<>();
   private final Object bootstrapLock = new Object();
   private final AtomicBoolean isInitialized = new AtomicBoolean(false);
 
@@ -79,7 +77,7 @@ public class CoordinatorStreamStore implements MetadataStore {
   public CoordinatorStreamStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
     this.config = config;
     this.type = namespace;
-    this.keySerde = new JsonSerde<>();
+    this.keySerde = new CoordinatorStreamKeySerde(type);
     this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
@@ -104,26 +102,26 @@ public class CoordinatorStreamStore implements MetadataStore {
   }
 
   @Override
-  public byte[] get(byte[] key) {
+  public byte[] get(String key) {
     bootstrapMessagesFromStream();
     return bootstrappedMessages.get(key);
   }
 
   @Override
-  public void put(byte[] key, byte[] value) {
-    OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, key, value);
+  public void put(String key, byte[] value) {
+    OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, keySerde.toBytes(key), value);
     systemProducer.send(SOURCE, envelope);
     flush();
   }
 
   @Override
-  public void delete(byte[] key) {
+  public void delete(String key) {
     // Since kafka doesn't support individual message deletion, store value as null for a key to delete.
     put(key, null);
   }
 
   @Override
-  public Map<byte[], byte[]> all() {
+  public Map<String, byte[]> all() {
     bootstrapMessagesFromStream();
     return Collections.unmodifiableMap(bootstrappedMessages);
   }
@@ -136,13 +134,14 @@ public class CoordinatorStreamStore implements MetadataStore {
       while (iterator.hasNext()) {
         IncomingMessageEnvelope envelope = iterator.next();
         byte[] keyAsBytes = (byte[]) envelope.getKey();
-        Object[] keyArray = keySerde.fromBytes(keyAsBytes).toArray();
+        Serde<List<?>> serde = new JsonSerde<>();
+        Object[] keyArray = serde.fromBytes(keyAsBytes).toArray();
         CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, new HashMap<>());
         if (Objects.equals(coordinatorStreamMessage.getType(), type)) {
           if (envelope.getMessage() != null) {
-            bootstrappedMessages.put(keyAsBytes, (byte[]) envelope.getMessage());
+            bootstrappedMessages.put(coordinatorStreamMessage.getKey(), (byte[]) envelope.getMessage());
           } else {
-            bootstrappedMessages.remove(keyAsBytes);
+            bootstrappedMessages.remove(coordinatorStreamMessage.getKey());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
index 4cfdc8d..fa41df6 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.zk;
 
-import java.nio.charset.Charset;
 import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.List;
@@ -64,7 +63,7 @@ public class ZkMetadataStore implements MetadataStore {
    * {@inheritDoc}
    */
   @Override
-  public byte[] get(byte[] key) {
+  public byte[] get(String key) {
     return zkClient.readData(getZkPathForKey(key), true);
   }
 
@@ -72,7 +71,7 @@ public class ZkMetadataStore implements MetadataStore {
    * {@inheritDoc}
    */
   @Override
-  public void put(byte[] key, byte[] value) {
+  public void put(String key, byte[] value) {
     String zkPath = getZkPathForKey(key);
     zkClient.createPersistent(zkPath, true);
     zkClient.writeData(zkPath, value);
@@ -82,7 +81,7 @@ public class ZkMetadataStore implements MetadataStore {
    * {@inheritDoc}
    */
   @Override
-  public void delete(byte[] key) {
+  public void delete(String key) {
     zkClient.delete(getZkPathForKey(key));
   }
 
@@ -91,15 +90,15 @@ public class ZkMetadataStore implements MetadataStore {
    * @throws SamzaException if there're exceptions reading data from zookeeper.
    */
   @Override
-  public Map<byte[], byte[]> all() {
+  public Map<String, byte[]> all() {
     try {
       List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
-      Map<byte[], byte[]> result = new HashMap<>();
+      Map<String, byte[]> result = new HashMap<>();
       for (String zkSubDir : zkSubDirectories) {
         String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir);
         byte[] value = zkClient.readData(completeZkPath, true);
         if (value != null) {
-          result.put(completeZkPath.getBytes("UTF-8"), value);
+          result.put(zkSubDir, value);
         }
       }
       return result;
@@ -126,7 +125,7 @@ public class ZkMetadataStore implements MetadataStore {
     zkClient.close();
   }
 
-  private String getZkPathForKey(byte[] key) {
-    return String.format("%s/%s", zkBaseDir, new String(key, Charset.forName("UTF-8")));
+  private String getZkPathForKey(String key) {
+    return String.format("%s/%s", zkBaseDir, key);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index 0e48363..2f6f598 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -62,7 +62,7 @@ public class TestCoordinatorStreamStore {
 
   @Test
   public void testReadAfterWrite() {
-    byte[] key = getKey("test-key1");
+    String key = "test-key1";
     byte[] value = getValue("test-value1");
     Assert.assertNull(coordinatorStreamStore.get(key));
     coordinatorStreamStore.put(key, value);
@@ -72,7 +72,7 @@ public class TestCoordinatorStreamStore {
 
   @Test
   public void testReadAfterDelete() {
-    byte[] key = getKey("test-key1");
+    String key = "test-key1";
     byte[] value = getValue("test-value1");
     Assert.assertNull(coordinatorStreamStore.get(key));
     coordinatorStreamStore.put(key, value);
@@ -84,13 +84,13 @@ public class TestCoordinatorStreamStore {
 
   @Test
   public void testReadOfNonExistentKey() {
-    Assert.assertNull(coordinatorStreamStore.get("randomKey".getBytes()));
+    Assert.assertNull(coordinatorStreamStore.get("randomKey"));
     Assert.assertEquals(0, coordinatorStreamStore.all().size());
   }
 
   @Test
   public void testMultipleUpdatesForSameKey() {
-    byte[] key = getKey("test-key1");
+    String key = "test-key1";
     byte[] value = getValue("test-value1");
     byte[] value1 = getValue("test-value2");
     coordinatorStreamStore.put(key, value);
@@ -101,16 +101,16 @@ public class TestCoordinatorStreamStore {
 
   @Test
   public void testAllEntries() {
-    byte[] key = getKey("test-key1");
-    byte[] key1 = getKey("test-key2");
-    byte[] key2 = getKey("test-key3");
+    String key = "test-key1";
+    String key1 = "test-key2";
+    String key2 = "test-key3";
     byte[] value = getValue("test-value1");
     byte[] value1 = getValue("test-value2");
     byte[] value2 = getValue("test-value3");
     coordinatorStreamStore.put(key, value);
     coordinatorStreamStore.put(key1, value1);
     coordinatorStreamStore.put(key2, value2);
-    ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+    ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
     Assert.assertEquals(expected, coordinatorStreamStore.all());
   }
 
@@ -119,11 +119,4 @@ public class TestCoordinatorStreamStore {
     SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", "testTask", value);
     return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
   }
-
-  private byte[] getKey(String key) {
-    JsonSerde<List<?>> keySerde = new JsonSerde<>();
-    SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", key, "");
-    return keySerde.toBytes(Arrays.asList(setTaskContainerMapping.getKeyArray()));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 5930c65..3d5f3b3 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -67,7 +67,7 @@ public class TestZkMetadataStore {
 
   @Test
   public void testReadAfterWrite() throws Exception {
-    byte[] key = "test-key1".getBytes("UTF-8");
+    String key = "test-key1";
     byte[] value = "test-value1".getBytes("UTF-8");
     Assert.assertNull(zkMetadataStore.get(key));
     zkMetadataStore.put(key, value);
@@ -77,7 +77,7 @@ public class TestZkMetadataStore {
 
   @Test
   public void testReadAfterDelete() throws Exception {
-    byte[] key = "test-key1".getBytes("UTF-8");
+    String key = "test-key1";
     byte[] value = "test-value1".getBytes("UTF-8");
     Assert.assertNull(zkMetadataStore.get(key));
     zkMetadataStore.put(key, value);
@@ -88,14 +88,14 @@ public class TestZkMetadataStore {
   }
 
   @Test
-  public void testReadOfNonExistentKey() throws Exception {
-    Assert.assertNull(zkMetadataStore.get("randomKey".getBytes("UTF-8")));
+  public void testReadOfNonExistentKey() {
+    Assert.assertNull(zkMetadataStore.get("randomKey"));
     Assert.assertEquals(0, zkMetadataStore.all().size());
   }
 
   @Test
   public void testMultipleUpdatesForSameKey() throws Exception {
-    byte[] key = "test-key1".getBytes("UTF-8");
+    String key = "test-key1";
     byte[] value = "test-value1".getBytes("UTF-8");
     byte[] value1 = "test-value2".getBytes("UTF-8");
     zkMetadataStore.put(key, value);
@@ -106,16 +106,16 @@ public class TestZkMetadataStore {
 
   @Test
   public void testAllEntries() throws Exception {
-    byte[] key = "test-key1".getBytes("UTF-8");
-    byte[] key1 = "test-key2".getBytes("UTF-8");
-    byte[] key2 = "test-key3".getBytes("UTF-8");
+    String key = "test-key1";
+    String key1 = "test-key2";
+    String key2 = "test-key3";
     byte[] value = "test-value1".getBytes("UTF-8");
     byte[] value1 = "test-value2".getBytes("UTF-8");
     byte[] value2 = "test-value3".getBytes("UTF-8");
     zkMetadataStore.put(key, value);
     zkMetadataStore.put(key1, value1);
     zkMetadataStore.put(key2, value2);
-    ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+    ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
     Assert.assertEquals(expected.size(), zkMetadataStore.all().size());
   }
 }