You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/04 22:28:47 UTC
samza git commit: SAMZA-1109: Updated High Level API serde impl with
Yi's feedback
Repository: samza
Updated Branches:
refs/heads/master 8b3fe5d26 -> ad80cf9f1
SAMZA-1109: Updated High Level API serde impl with Yi's feedback
nickpan47 for review.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: "Jagadish Venkatraman <jv...@linkedin.com>"
Closes #310 from prateekm/serde-updates
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad80cf9f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad80cf9f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad80cf9f
Branch: refs/heads/master
Commit: ad80cf9f1be427878849a61d77cf1a76381e7642
Parents: 8b3fe5d
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Oct 4 15:28:37 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 15:28:37 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/execution/JobNode.java | 2 +-
.../apache/samza/operators/StreamGraphImpl.java | 33 +++++++++++++++-----
2 files changed, 27 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ad80cf9f/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 7ff43ed..0368829 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -141,7 +141,7 @@ public class JobNode {
String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
- // Disallow user specified job inputs/outputs. This info comes strictly from the pipeline.
+ // Disallow user specified job inputs/outputs. This info comes strictly from the user application.
Map<String, String> allowedConfigs = new HashMap<>(config);
if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
log.warn("Specifying task inputs in configuration is not allowed with Fluent API. "
http://git-wip-us.apache.org/repos/asf/samza/blob/ad80cf9f/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 45378c7..a02ed3e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -80,15 +80,24 @@ public class StreamGraphImpl implements StreamGraph {
@Override
public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
+ StreamSpec streamSpec = runner.getStreamSpec(streamId);
Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
- Preconditions.checkState(!inputOperators.containsKey(runner.getStreamSpec(streamId)),
+ Preconditions.checkState(!inputOperators.containsKey(streamSpec),
"getInputStream must not be called multiple times with the same streamId: " + streamId);
- StreamSpec streamSpec = runner.getStreamSpec(streamId);
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
- boolean isKeyedInput = serde instanceof KVSerde;
+ if (outputStreams.containsKey(streamSpec)) {
+ OutputStreamImpl outputStream = outputStreams.get(streamSpec);
+ Serde keySerde = outputStream.getKeySerde();
+ Serde valueSerde = outputStream.getValueSerde();
+ Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+ String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ + "stream level, so the same key and message Serde must be used for both.", streamId));
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
inputOperators.put(streamSpec,
- new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyedInput, this.getNextOpId()));
+ new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
}
@@ -99,14 +108,24 @@ public class StreamGraphImpl implements StreamGraph {
@Override
public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
+ StreamSpec streamSpec = runner.getStreamSpec(streamId);
Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
- Preconditions.checkState(!outputStreams.containsKey(runner.getStreamSpec(streamId)),
+ Preconditions.checkState(!outputStreams.containsKey(streamSpec),
"getOutputStream must not be called multiple times with the same streamId: " + streamId);
- StreamSpec streamSpec = runner.getStreamSpec(streamId);
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ if (inputOperators.containsKey(streamSpec)) {
+ InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
+ Serde keySerde = inputOperatorSpec.getKeySerde();
+ Serde valueSerde = inputOperatorSpec.getValueSerde();
+ Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+ String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ + "stream level, so the same key and message Serde must be used for both.", streamId));
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
outputStreams.put(streamSpec,
- new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), serde instanceof KVSerde));
+ new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return outputStreams.get(streamSpec);
}