You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/04 22:28:47 UTC

samza git commit: SAMZA-1109: Updated High Level API serde impl with Yi's feedback

Repository: samza
Updated Branches:
  refs/heads/master 8b3fe5d26 -> ad80cf9f1


SAMZA-1109: Updated High Level API serde impl with Yi's feedback

nickpan47 for review.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: "Jagadish Venkatraman <jv...@linkedin.com>"

Closes #310 from prateekm/serde-updates


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad80cf9f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad80cf9f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad80cf9f

Branch: refs/heads/master
Commit: ad80cf9f1be427878849a61d77cf1a76381e7642
Parents: 8b3fe5d
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Oct 4 15:28:37 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 15:28:37 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/execution/JobNode.java     |  2 +-
 .../apache/samza/operators/StreamGraphImpl.java | 33 +++++++++++++++-----
 2 files changed, 27 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ad80cf9f/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 7ff43ed..0368829 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -141,7 +141,7 @@ public class JobNode {
 
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
 
-    // Disallow user specified job inputs/outputs. This info comes strictly from the pipeline.
+    // Disallow user specified job inputs/outputs. This info comes strictly from the user application.
     Map<String, String> allowedConfigs = new HashMap<>(config);
     if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
       log.warn("Specifying task inputs in configuration is not allowed with Fluent API. "

http://git-wip-us.apache.org/repos/asf/samza/blob/ad80cf9f/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 45378c7..a02ed3e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -80,15 +80,24 @@ public class StreamGraphImpl implements StreamGraph {
 
   @Override
   public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
-    Preconditions.checkState(!inputOperators.containsKey(runner.getStreamSpec(streamId)),
+    Preconditions.checkState(!inputOperators.containsKey(streamSpec),
         "getInputStream must not be called multiple times with the same streamId: " + streamId);
 
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    boolean isKeyedInput = serde instanceof KVSerde;
+    if (outputStreams.containsKey(streamSpec)) {
+      OutputStreamImpl outputStream = outputStreams.get(streamSpec);
+      Serde keySerde = outputStream.getKeySerde();
+      Serde valueSerde = outputStream.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
     inputOperators.put(streamSpec,
-        new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyedInput, this.getNextOpId()));
+        new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
     return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
   }
 
@@ -99,14 +108,24 @@ public class StreamGraphImpl implements StreamGraph {
 
   @Override
   public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
-    Preconditions.checkState(!outputStreams.containsKey(runner.getStreamSpec(streamId)),
+    Preconditions.checkState(!outputStreams.containsKey(streamSpec),
         "getOutputStream must not be called multiple times with the same streamId: " + streamId);
 
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    if (inputOperators.containsKey(streamSpec)) {
+      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
+      Serde keySerde = inputOperatorSpec.getKeySerde();
+      Serde valueSerde = inputOperatorSpec.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
     outputStreams.put(streamSpec,
-        new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), serde instanceof KVSerde));
+        new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
     return outputStreams.get(streamSpec);
   }