You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/17 01:56:49 UTC

samza git commit: SAMZA-1956: Update value only descriptor serde

Repository: samza
Updated Branches:
  refs/heads/master 4d6ff989b -> 8b8526682


SAMZA-1956: Update value only descriptor serde

Changed the KVSerde to only value Serde  for the Eventhubs input and output descriptors.
Since the key is always a `String`, the key serde should always be `NoOpSerde` and will lead to an error otherwise since the Samza `serializers.SerdeManager.scala` expectes a `byte[]`

Author: Daniel Chen <dc...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #733 from dxichen/eventhubs-example-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b852668
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b852668
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b852668

Branch: refs/heads/master
Commit: 8b85266829e65ef46ab15d6356e3fa87cd00684f
Parents: 4d6ff98
Author: Daniel Chen <dc...@linkedin.com>
Authored: Tue Oct 16 18:56:44 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Oct 16 18:56:44 2018 -0700

----------------------------------------------------------------------
 .../descriptors/EventHubsInputDescriptor.java   |  8 ++++--
 .../descriptors/EventHubsOutputDescriptor.java  |  8 ++++--
 .../descriptors/EventHubsSystemDescriptor.java  | 29 ++++++++++----------
 .../TestEventHubsInputDescriptor.java           | 26 ++++++++++++++----
 .../TestEventHubsOutputDescriptor.java          | 26 ++++++++++++++----
 .../TestEventHubsSystemDescriptor.java          | 26 ++++++------------
 6 files changed, 73 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
index 462cc05..df22269 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 
@@ -52,12 +54,12 @@ public class EventHubsInputDescriptor<StreamMessageType>
    * @param streamId id of the stream
    * @param namespace namespace for the Event Hubs entity to consume from, not null
    * @param entityPath entity path for the Event Hubs entity to consume from, not null
-   * @param serde serde for messages in the stream
+   * @param valueSerde serde the values in the messages in the stream
    * @param systemDescriptor system descriptor this stream descriptor was obtained from
    */
-  EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+  EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde,
       SystemDescriptor systemDescriptor) {
-    super(streamId, serde, systemDescriptor, null);
+    super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null);
     this.namespace = StringUtils.stripToNull(namespace);
     this.entityPath = StringUtils.stripToNull(entityPath);
     if (this.namespace == null || this.entityPath == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
index ddbf79c..95f7e42 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 
@@ -50,12 +52,12 @@ public class EventHubsOutputDescriptor<StreamMessageType>
    * @param streamId id of the stream
    * @param namespace namespace for the Event Hubs entity to produce to, not null
    * @param entityPath entity path for the Event Hubs entity to produce to, not null
-   * @param serde serde for messages in the stream
+   * @param valueSerde serde the values in the messages in the stream
    * @param systemDescriptor system descriptor this stream descriptor was obtained from
    */
-  EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+  EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde,
       SystemDescriptor systemDescriptor) {
-    super(streamId, serde, systemDescriptor);
+    super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor);
     this.namespace = StringUtils.stripToNull(namespace);
     this.entityPath = StringUtils.stripToNull(entityPath);
     if (this.namespace == null || this.entityPath == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
index 80bdfae..feffd87 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.samza.operators.KV;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
@@ -57,40 +58,40 @@ public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemD
 
   /**
    * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
-   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+   * namespace and entity name of the associated Event Hubs entity and the provided stream level value serde.
    * <p>
-   * The type of messages in the stream is the type of the provided stream level serde.
+   * The message in the stream will have {@link String} keys and {@code ValueType} values.
    *
    * @param streamId id of the input stream
    * @param namespace namespace of the Event Hubs entity to consume from
    * @param entityPath entity path of the Event Hubs entity to consume from
-   * @param serde stream level serde for the input stream
-   * @param <StreamMessageType> type of messages in this stream
+   * @param valueSerde stream level serde for the values in the messages in the input stream
+   * @param <ValueType> type of the value in the messages in this stream
    * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
    */
-  public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
-      String entityPath, Serde<StreamMessageType> serde) {
+  public <ValueType> EventHubsInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<ValueType> valueSerde) {
     streamIds.add(streamId);
-    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
+    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, valueSerde, this);
   }
 
   /**
    * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
-   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+   * namespace and entity name of the associated Event Hubs entity and the provided stream level value serde.
    * <p>
-   * The type of messages in the stream is the type of the provided stream level serde.
+   * The message in the stream will have {@link String} keys and {@code ValueType} values.
    *
    * @param streamId id of the output stream
    * @param namespace namespace of the Event Hubs entity to produce to
    * @param entityPath entity path of the Event Hubs entity to produce to
-   * @param serde stream level serde for the output stream
-   * @param <StreamMessageType> type of the messages in this stream
+   * @param valueSerde stream level serde for the values in the messages to the output stream
+   * @param <ValueType> type of the value in the messages in this stream
    * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
    */
-  public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
-      String entityPath, Serde<StreamMessageType> serde) {
+  public <ValueType> EventHubsOutputDescriptor<KV<String, ValueType>> getOutputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<ValueType> valueSerde) {
     streamIds.add(streamId);
-    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
+    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, valueSerde, this);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
index 1e6b368..c633ccc 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
@@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors;
 import java.util.Map;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 
@@ -40,8 +41,8 @@ public class TestEventHubsInputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
 
-    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
-        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde())
         .withSasKeyName("secretkey")
         .withSasKey("sasToken-123")
         .withConsumerGroup("$notdefault");
@@ -62,8 +63,8 @@ public class TestEventHubsInputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
 
-    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
-        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
 
     Map<String, String> generatedConfigs = inputDescriptor.toConfig();
     assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
@@ -82,11 +83,24 @@ public class TestEventHubsInputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
     try {
-      systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+      systemDescriptor.getInputDescriptor(streamId, null, null, new StringSerde());
       fail("Should have thrown Config Exception");
     } catch (ConfigException exception) {
       assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
           + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
     }
   }
+
+  @Test
+  public void testStreamDescriptorContainsKVserde() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+    EventHubsInputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
+    assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
index fa8ae56..f214aa3 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
@@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors;
 import java.util.Map;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestEventHubsOutputDescriptor {
@@ -39,8 +40,8 @@ public class TestEventHubsOutputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
 
-    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
-        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde())
         .withSasKeyName("secretkey")
         .withSasKey("sasToken-123");
 
@@ -59,8 +60,8 @@ public class TestEventHubsOutputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
 
-    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
-        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
 
     Map<String, String> generatedConfigs = outputDescriptor.toConfig();
     assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
@@ -79,11 +80,24 @@ public class TestEventHubsOutputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
     try {
-      systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+      systemDescriptor.getOutputDescriptor(streamId, null, null, new StringSerde());
       fail("Should have thrown Config Exception");
     } catch (ConfigException exception) {
       assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
           + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
     }
   }
+
+  @Test
+  public void testStreamDescriptorContainsKVserde() {
+    String systemName = "eventHub";
+    String streamId = "output-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
+    assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
index 7f73bd9..84c8589 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
@@ -19,8 +19,6 @@
 package org.apache.samza.system.eventhub.descriptors;
 
 import java.util.Map;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
@@ -47,14 +45,10 @@ public class TestEventHubsSystemDescriptor {
         .withRuntimeInfoTimeout(60000)
         .withSendKeys(false);
 
-    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde());
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde());
 
     Map<String, String> generatedConfigs = systemDescriptor.toConfig();
     assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
@@ -96,14 +90,10 @@ public class TestEventHubsSystemDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
 
-    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde());
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde());
 
     Map<String, String> generatedConfigs = systemDescriptor.toConfig();
     assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));