You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/17 01:56:49 UTC
samza git commit: SAMZA-1956: Update value only descriptor serde
Repository: samza
Updated Branches:
refs/heads/master 4d6ff989b -> 8b8526682
SAMZA-1956: Update value only descriptor serde
Changed the KVSerde to only value Serde for the Eventhubs input and output descriptors.
Since the key is always a `String`, the key serde should always be `NoOpSerde` and will lead to an error otherwise since the Samza `serializers.SerdeManager.scala` expectes a `byte[]`
Author: Daniel Chen <dc...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #733 from dxichen/eventhubs-example-cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b852668
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b852668
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b852668
Branch: refs/heads/master
Commit: 8b85266829e65ef46ab15d6356e3fa87cd00684f
Parents: 4d6ff98
Author: Daniel Chen <dc...@linkedin.com>
Authored: Tue Oct 16 18:56:44 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Oct 16 18:56:44 2018 -0700
----------------------------------------------------------------------
.../descriptors/EventHubsInputDescriptor.java | 8 ++++--
.../descriptors/EventHubsOutputDescriptor.java | 8 ++++--
.../descriptors/EventHubsSystemDescriptor.java | 29 ++++++++++----------
.../TestEventHubsInputDescriptor.java | 26 ++++++++++++++----
.../TestEventHubsOutputDescriptor.java | 26 ++++++++++++++----
.../TestEventHubsSystemDescriptor.java | 26 ++++++------------
6 files changed, 73 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
index 462cc05..df22269 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ConfigException;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.eventhub.EventHubConfig;
@@ -52,12 +54,12 @@ public class EventHubsInputDescriptor<StreamMessageType>
* @param streamId id of the stream
* @param namespace namespace for the Event Hubs entity to consume from, not null
* @param entityPath entity path for the Event Hubs entity to consume from, not null
- * @param serde serde for messages in the stream
+ * @param valueSerde serde the values in the messages in the stream
* @param systemDescriptor system descriptor this stream descriptor was obtained from
*/
- EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+ EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde,
SystemDescriptor systemDescriptor) {
- super(streamId, serde, systemDescriptor, null);
+ super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null);
this.namespace = StringUtils.stripToNull(namespace);
this.entityPath = StringUtils.stripToNull(entityPath);
if (this.namespace == null || this.entityPath == null) {
http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
index ddbf79c..95f7e42 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ConfigException;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.eventhub.EventHubConfig;
@@ -50,12 +52,12 @@ public class EventHubsOutputDescriptor<StreamMessageType>
* @param streamId id of the stream
* @param namespace namespace for the Event Hubs entity to produce to, not null
* @param entityPath entity path for the Event Hubs entity to produce to, not null
- * @param serde serde for messages in the stream
+ * @param valueSerde serde the values in the messages in the stream
* @param systemDescriptor system descriptor this stream descriptor was obtained from
*/
- EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+ EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde,
SystemDescriptor systemDescriptor) {
- super(streamId, serde, systemDescriptor);
+ super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor);
this.namespace = StringUtils.stripToNull(namespace);
this.entityPath = StringUtils.stripToNull(entityPath);
if (this.namespace == null || this.entityPath == null) {
http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
index 80bdfae..feffd87 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import org.apache.samza.operators.KV;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.eventhub.EventHubConfig;
@@ -57,40 +58,40 @@ public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemD
/**
* Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
- * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+ * namespace and entity name of the associated Event Hubs entity and the provided stream level value serde.
* <p>
- * The type of messages in the stream is the type of the provided stream level serde.
+ * The message in the stream will have {@link String} keys and {@code ValueType} values.
*
* @param streamId id of the input stream
* @param namespace namespace of the Event Hubs entity to consume from
* @param entityPath entity path of the Event Hubs entity to consume from
- * @param serde stream level serde for the input stream
- * @param <StreamMessageType> type of messages in this stream
+ * @param valueSerde stream level serde for the values in the messages in the input stream
+ * @param <ValueType> type of the value in the messages in this stream
* @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
*/
- public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
- String entityPath, Serde<StreamMessageType> serde) {
+ public <ValueType> EventHubsInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId, String namespace,
+ String entityPath, Serde<ValueType> valueSerde) {
streamIds.add(streamId);
- return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
+ return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, valueSerde, this);
}
/**
* Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
- * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+ * namespace and entity name of the associated Event Hubs entity and the provided stream level value serde.
* <p>
- * The type of messages in the stream is the type of the provided stream level serde.
+ * The message in the stream will have {@link String} keys and {@code ValueType} values.
*
* @param streamId id of the output stream
* @param namespace namespace of the Event Hubs entity to produce to
* @param entityPath entity path of the Event Hubs entity to produce to
- * @param serde stream level serde for the output stream
- * @param <StreamMessageType> type of the messages in this stream
+ * @param valueSerde stream level serde for the values in the messages to the output stream
+ * @param <ValueType> type of the value in the messages in this stream
* @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
*/
- public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
- String entityPath, Serde<StreamMessageType> serde) {
+ public <ValueType> EventHubsOutputDescriptor<KV<String, ValueType>> getOutputDescriptor(String streamId, String namespace,
+ String entityPath, Serde<ValueType> valueSerde) {
streamIds.add(streamId);
- return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
+ return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, valueSerde, this);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
index 1e6b368..c633ccc 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
@@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors;
import java.util.Map;
import org.apache.samza.config.ConfigException;
import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -40,8 +41,8 @@ public class TestEventHubsInputDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
- .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+ EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor
+ .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde())
.withSasKeyName("secretkey")
.withSasKey("sasToken-123")
.withConsumerGroup("$notdefault");
@@ -62,8 +63,8 @@ public class TestEventHubsInputDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
- .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+ EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor
+ .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
Map<String, String> generatedConfigs = inputDescriptor.toConfig();
assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
@@ -82,11 +83,24 @@ public class TestEventHubsInputDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
try {
- systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+ systemDescriptor.getInputDescriptor(streamId, null, null, new StringSerde());
fail("Should have thrown Config Exception");
} catch (ConfigException exception) {
assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+ "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
}
}
+
+ @Test
+ public void testStreamDescriptorContainsKVserde() {
+ String systemName = "eventHub";
+ String streamId = "input-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+ EventHubsInputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+ .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
+ assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
+ assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde);
+ assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
index fa8ae56..f214aa3 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
@@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors;
import java.util.Map;
import org.apache.samza.config.ConfigException;
import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestEventHubsOutputDescriptor {
@@ -39,8 +40,8 @@ public class TestEventHubsOutputDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
- .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+ EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+ .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde())
.withSasKeyName("secretkey")
.withSasKey("sasToken-123");
@@ -59,8 +60,8 @@ public class TestEventHubsOutputDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
- .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+ EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+ .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
Map<String, String> generatedConfigs = outputDescriptor.toConfig();
assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
@@ -79,11 +80,24 @@ public class TestEventHubsOutputDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
try {
- systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+ systemDescriptor.getOutputDescriptor(streamId, null, null, new StringSerde());
fail("Should have thrown Config Exception");
} catch (ConfigException exception) {
assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+ "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
}
}
+
+ @Test
+ public void testStreamDescriptorContainsKVserde() {
+ String systemName = "eventHub";
+ String streamId = "output-stream";
+
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+ EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
+ .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
+ assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
+ assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde);
+ assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
index 7f73bd9..84c8589 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
@@ -19,8 +19,6 @@
package org.apache.samza.system.eventhub.descriptors;
import java.util.Map;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
@@ -47,14 +45,10 @@ public class TestEventHubsSystemDescriptor {
.withRuntimeInfoTimeout(60000)
.withSendKeys(false);
- systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
+ systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde());
+ systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde());
+ systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde());
+ systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde());
Map<String, String> generatedConfigs = systemDescriptor.toConfig();
assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
@@ -96,14 +90,10 @@ public class TestEventHubsSystemDescriptor {
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
- systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
- systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
- new IntegerSerde()));
+ systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde());
+ systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde());
+ systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde());
+ systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde());
Map<String, String> generatedConfigs = systemDescriptor.toConfig();
assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));