You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/26 00:18:41 UTC
[2/2] samza git commit: SAMZA-1454: Globally unique and user settable
IDs for stateful operators
SAMZA-1454: Globally unique and user settable IDs for stateful operators
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish Venkatraman <ja...@apache.org>, Yi Pan <ni...@gmail.com>
Closes #324 from prateekm/operator-id-uniqueness
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1296c7ff
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1296c7ff
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1296c7ff
Branch: refs/heads/master
Commit: 1296c7ff91daff4abeac392cec53600f7c5cb427
Parents: 711dd8d
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Oct 25 17:18:36 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 25 17:18:36 2017 -0700
----------------------------------------------------------------------
.../apache/samza/operators/MessageStream.java | 41 +++++++----
.../samza/execution/JobGraphJsonGenerator.java | 4 +-
.../samza/operators/MessageStreamImpl.java | 44 +++++++-----
.../apache/samza/operators/StreamGraphImpl.java | 74 +++++++++++++++-----
.../samza/operators/impl/OperatorImpl.java | 32 ++++-----
.../samza/operators/impl/OperatorImplGraph.java | 20 +++---
.../operators/impl/PartialJoinOperatorImpl.java | 32 ++++++---
.../operators/impl/WindowOperatorImpl.java | 3 +-
.../samza/operators/spec/InputOperatorSpec.java | 2 +-
.../samza/operators/spec/JoinOperatorSpec.java | 14 ++--
.../samza/operators/spec/OperatorSpec.java | 14 +---
.../samza/operators/spec/OperatorSpecs.java | 36 +++++++---
.../operators/spec/OutputOperatorSpec.java | 2 +-
.../operators/spec/PartitionByOperatorSpec.java | 2 +-
.../samza/operators/spec/SinkOperatorSpec.java | 2 +-
.../operators/spec/StreamOperatorSpec.java | 2 +-
.../operators/spec/WindowOperatorSpec.java | 4 +-
.../samza/example/KeyValueStoreExample.java | 2 +-
.../samza/example/OrderShipmentJoinExample.java | 2 +-
.../samza/example/PageViewCounterExample.java | 2 +-
.../samza/example/RepartitionExample.java | 5 +-
.../org/apache/samza/example/WindowExample.java | 5 +-
.../samza/execution/TestExecutionPlanner.java | 44 ++++++------
.../execution/TestJobGraphJsonGenerator.java | 16 ++---
.../org/apache/samza/execution/TestJobNode.java | 34 +++++----
.../samza/operators/TestJoinOperator.java | 33 +++++++--
.../samza/operators/TestMessageStreamImpl.java | 26 +++----
.../samza/operators/TestStreamGraphImpl.java | 61 ++++++++++------
.../samza/operators/TestWindowOperator.java | 46 ++++++++----
.../samza/operators/impl/TestOperatorImpl.java | 2 +-
.../operators/impl/TestOperatorImplGraph.java | 47 ++++++++-----
.../operators/spec/TestWindowOperatorSpec.java | 4 +-
.../EndOfStreamIntegrationTest.java | 2 +-
.../WatermarkIntegrationTest.java | 2 +-
.../test/operator/RepartitionJoinWindowApp.java | 13 ++--
.../samza/test/operator/SessionWindowApp.java | 4 +-
.../samza/test/operator/TumblingWindowApp.java | 3 +-
37 files changed, 425 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index c36fe1f..dcce7c8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -73,7 +73,7 @@ public interface MessageStream<M> {
* should be retained in the filtered {@link MessageStream}.
*
* @param filterFn the predicate to filter messages from this {@link MessageStream}.
- * @return the transformed {@link MessageStream}
+ * @return the filtered {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<? super M> filterFn);
@@ -105,15 +105,19 @@ public interface MessageStream<M> {
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
* <p>
- * <b>Warning:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
+ * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
+ * for any state stores and streams created by this operator (the full ID also contains the job name, job id and
+ * operator type). If the application logic is changed, this ID must be reused in the new operator to retain
+ * state from the previous version, and changed for the new operator to discard the state from the previous version.
*
* @param window the window to group and process messages from this {@link MessageStream}
+ * @param id the unique id of this operator in this application
* @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
* panes are emitted per-key.
* @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
- * @return the transformed {@link MessageStream}
+ * @return the windowed {@link MessageStream}
*/
- <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
+ <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String id);
/**
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided
@@ -124,14 +128,18 @@ public interface MessageStream<M> {
* <p>
* Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
* <p>
- * <b>Warning:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
+ * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
+ * for any state stores and streams created by this operator (the full ID also contains the job name, job id and
+ * operator type). If the application logic is changed, this ID must be reused in the new operator to retain
+ * state from the previous version, and changed for the new operator to discard the state from the previous version.
*
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join messages from this and the other {@link MessageStream}
- * @param ttl the ttl for messages in each stream
* @param keySerde the serde for the join key
* @param messageSerde the serde for messages in this stream
* @param otherMessageSerde the serde for messages in the other stream
+ * @param ttl the ttl for messages in each stream
+ * @param id the unique id of this operator in this application
* @param <K> the type of join key
* @param <OM> the type of messages in the other stream
* @param <JM> the type of messages resulting from the {@code joinFn}
@@ -139,7 +147,8 @@ public interface MessageStream<M> {
*/
<K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
- Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl);
+ Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
+ Duration ttl, String id);
/**
* Merges all {@code otherStreams} with this {@link MessageStream}.
@@ -186,26 +195,34 @@ public interface MessageStream<M> {
* configuration, if present.
* Else, the number of partitions is set to to the max of number of partitions for all input and output streams
* (excluding intermediate streams).
+ * <p>
+ * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
+ * for any state stores and streams created by this operator (the full ID also contains the job name, job id and
+ * operator type). If the application logic is changed, this ID must be reused in the new operator to retain
+ * state from the previous version, and changed for the new operator to discard the state from the previous version.
*
- * @param <K> the type of output key
- * @param <V> the type of output value
* @param keyExtractor the {@link Function} to extract the message and partition key from the input message
* @param valueExtractor the {@link Function} to extract the value from the input message
* @param serde the {@link KVSerde} to use for (de)serializing the key and value.
+ * @param id the unique id of this operator in this application
+ * @param <K> the type of output key
+ * @param <V> the type of output value
* @return the repartitioned {@link MessageStream}
*/
<K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde);
+ Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
+
/**
- * Same as calling {@link #partitionBy(Function, Function, KVSerde)} with a null KVSerde.
+ * Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde.
*
* @param keyExtractor the {@link Function} to extract the message and partition key from the input message
* @param valueExtractor the {@link Function} to extract the value from the input message
+ * @param id the unique id of this operator in this application
* @param <K> the type of output key
* @param <V> the type of output value
* @return the repartitioned {@link MessageStream}
*/
<K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor);
+ Function<? super M, ? extends V> valueExtractor, String id);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 23c9d89..03845e3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -69,14 +69,14 @@ import org.codehaus.jackson.map.ObjectMapper;
@JsonProperty("outputStreams")
List<StreamJson> outputStreams;
@JsonProperty("operators")
- Map<Integer, Map<String, Object>> operators = new HashMap<>();
+ Map<String, Map<String, Object>> operators = new HashMap<>();
}
static final class StreamJson {
@JsonProperty("streamId")
String streamId;
@JsonProperty("nextOperatorIds")
- Set<Integer> nextOperatorIds = new HashSet<>();
+ Set<String> nextOperatorIds = new HashSet<>();
}
static final class JobNodeJson {
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index dc91d19..e6e711c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators;
+import org.apache.samza.SamzaException;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
@@ -26,6 +27,7 @@ import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -72,42 +74,47 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
- OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, this.graph.getNextOpId());
+ String opId = this.graph.getNextOpId(OpCode.MAP);
+ OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@Override
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
- OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, this.graph.getNextOpId());
+ String opId = this.graph.getNextOpId(OpCode.FILTER);
+ OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
- OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, this.graph.getNextOpId());
+ String opId = this.graph.getNextOpId(OpCode.FLAT_MAP);
+ OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@Override
public void sink(SinkFunction<? super M> sinkFn) {
- SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
+ String opId = this.graph.getNextOpId(OpCode.SINK);
+ SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
public void sendTo(OutputStream<M> outputStream) {
- OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
- (OutputStreamImpl<M>) outputStream, this.graph.getNextOpId());
+ String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+ OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl<M>) outputStream, opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
- public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
+ public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
+ String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId);
OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec(
- (WindowInternal<M, K, WV>) window, this.graph.getNextOpId());
+ (WindowInternal<M, K, WV>) window, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@@ -115,11 +122,14 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
- Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) {
+ Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
+ Duration ttl, String userDefinedId) {
+ if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
+ String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
- keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId());
+ keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId);
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
@@ -130,7 +140,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
if (otherStreams.isEmpty()) return this;
- StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId());
+ String opId = this.graph.getNextOpId(OpCode.MERGE);
+ StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(opId);
this.operatorSpec.registerNextOperatorSpec(opSpec);
otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec));
return new MessageStreamImpl<>(this.graph, opSpec);
@@ -138,10 +149,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde) {
- int opId = this.graph.getNextOpId();
- String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
- IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opName, serde);
+ Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
+ String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
+ IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde);
PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec =
OperatorSpecs.createPartitionByOperatorSpec(
intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
@@ -151,8 +161,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor) {
- return partitionBy(keyExtractor, valueExtractor, null);
+ Function<? super M, ? extends V> valueExtractor, String userDefinedId) {
+ return partitionBy(keyExtractor, valueExtractor, null, userDefinedId);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index a02ed3e..936cb3a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -19,10 +19,14 @@
package org.apache.samza.operators;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ApplicationRunner;
@@ -48,18 +52,19 @@ import java.util.stream.Collectors;
public class StreamGraphImpl implements StreamGraph {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
- /**
- * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
- * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}.
- */
- private int opId = 0;
-
// We use a LHM for deterministic order in initializing and closing operators.
private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
private final ApplicationRunner runner;
private final Config config;
+ /**
+ * The 0-based position of the next operator in the graph.
+ * Part of the unique ID for each OperatorSpec in the graph.
+ * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
+ */
+ private int nextOpNum = 0;
+ private final Set<String> operatorIds = new HashSet<>();
private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde());
private ContextManager contextManager = null;
@@ -81,6 +86,7 @@ public class StreamGraphImpl implements StreamGraph {
@Override
public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
StreamSpec streamSpec = runner.getStreamSpec(streamId);
+ Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
Preconditions.checkState(!inputOperators.containsKey(streamSpec),
"getInputStream must not be called multiple times with the same streamId: " + streamId);
@@ -96,8 +102,11 @@ public class StreamGraphImpl implements StreamGraph {
}
boolean isKeyed = serde instanceof KVSerde;
+ InputOperatorSpec inputOperatorSpec =
+ OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
+ isKeyed, this.getNextOpId(OpCode.INPUT, null));
inputOperators.put(streamSpec,
- new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
+ inputOperatorSpec);
return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
}
@@ -109,6 +118,7 @@ public class StreamGraphImpl implements StreamGraph {
@Override
public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
StreamSpec streamSpec = runner.getStreamSpec(streamId);
+ Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
Preconditions.checkState(!outputStreams.containsKey(streamSpec),
"getOutputStream must not be called multiple times with the same streamId: " + streamId);
@@ -144,20 +154,15 @@ public class StreamGraphImpl implements StreamGraph {
* Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
* An intermediate {@link MessageStream} is both an output and an input stream.
*
- * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
- * logical streamId.
+ * @param streamId the id of the stream to be created.
* @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
* is used.
* @param <M> the type of messages in the intermediate {@link MessageStream}
* @return the intermediate {@link MessageStreamImpl}
*/
- <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamName, Serde<M> serde) {
- String streamId = String.format("%s-%s-%s",
- config.get(JobConfig.JOB_NAME()),
- config.get(JobConfig.JOB_ID(), "1"),
- streamName);
+ <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) {
StreamSpec streamSpec = runner.getStreamSpec(streamId);
-
+
Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
"getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
@@ -168,8 +173,10 @@ public class StreamGraphImpl implements StreamGraph {
boolean isKeyed = serde instanceof KVSerde;
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
- inputOperators.put(streamSpec,
- new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
+ InputOperatorSpec inputOperatorSpec =
+ OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
+ isKeyed, this.getNextOpId(OpCode.INPUT, null));
+ inputOperators.put(streamSpec, inputOperatorSpec);
outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
}
@@ -186,8 +193,37 @@ public class StreamGraphImpl implements StreamGraph {
return this.contextManager;
}
- /* package private */ int getNextOpId() {
- return this.opId++;
+ /**
+ * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+ * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+ *
+ * @param opCode the {@link OpCode} of the next operator
+ * @param userDefinedId the optional user-provided name of the next operator or null
+ * @return the unique ID for the next operator in the graph
+ */
+ /* package private */ String getNextOpId(OpCode opCode, String userDefinedId) {
+ String nextOpId = String.format("%s-%s-%s-%s",
+ config.get(JobConfig.JOB_NAME()),
+ config.get(JobConfig.JOB_ID(), "1"),
+ opCode.name().toLowerCase(),
+ StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
+ if (!operatorIds.add(nextOpId)) {
+ throw new SamzaException(
+ String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
+ }
+ nextOpNum++;
+ return nextOpId;
+ }
+
+ /**
+ * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+ * jobName-jobId-opCode-nextOpNum;
+ *
+ * @param opCode the {@link OpCode} of the next operator
+ * @return the unique ID for the next operator in the graph
+ */
+ /* package private */ String getNextOpId(OpCode opCode) {
+ return getNextOpId(opCode, null);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 0c50630..f5a2624 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -81,14 +81,14 @@ public abstract class OperatorImpl<M, RM> {
* @param context the {@link TaskContext} for the task
*/
public final void init(Config config, TaskContext context) {
- String opName = getOperatorName();
+ String opId = getOpImplId();
if (initialized) {
- throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
+ throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opId));
}
if (closed) {
- throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opName));
+ throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opId));
}
this.highResClock = createHighResClock(config);
@@ -96,9 +96,9 @@ public abstract class OperatorImpl<M, RM> {
prevOperators = new HashSet<>();
inputStreams = new HashSet<>();
MetricsRegistry metricsRegistry = context.getMetricsRegistry();
- this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
- this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
- this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
+ this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + "-messages");
+ this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns");
+ this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns");
this.taskName = context.getTaskName();
TaskContextImpl taskContext = (TaskContextImpl) context;
@@ -127,7 +127,7 @@ public abstract class OperatorImpl<M, RM> {
if (!initialized) {
throw new IllegalStateException(
String.format("Attempted to register next operator before initializing operator %s.",
- getOperatorName()));
+ getOpImplId()));
}
this.registeredOperators.add(nextOperator);
nextOperator.registerPrevOperator(this);
@@ -163,7 +163,7 @@ public abstract class OperatorImpl<M, RM> {
String.format("Error applying operator %s (created at %s) to its input message. "
+ "Expected input message to be of type %s, but found it to be of type %s. "
+ "Are Serdes for the inputs to this operator configured correctly?",
- getOperatorName(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
+ getOpImplId(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
}
long endNs = this.highResClock.nanoTime();
@@ -317,7 +317,7 @@ public abstract class OperatorImpl<M, RM> {
if (inputWatermark < inputWatermarkMin) {
// advance the watermark time of this operator
inputWatermark = inputWatermarkMin;
- LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOperatorName());
+ LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOpImplId());
final Long outputWm;
WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
@@ -340,7 +340,7 @@ public abstract class OperatorImpl<M, RM> {
if (outputWatermark < outputWm) {
// advance the watermark
outputWatermark = outputWm;
- LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOperatorName());
+ LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOpImplId());
this.registeredOperators.forEach(op -> op.onWatermark(outputWatermark, collector, coordinator));
} else if (outputWatermark > outputWm) {
LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", outputWm, outputWatermark);
@@ -375,7 +375,7 @@ public abstract class OperatorImpl<M, RM> {
public void close() {
if (closed) {
throw new IllegalStateException(
- String.format("Attempted to close Operator %s more than once.", getOperatorSpec().getOpName()));
+ String.format("Attempted to close Operator %s more than once.", getOpImplId()));
}
handleClose();
closed = true;
@@ -391,16 +391,16 @@ public abstract class OperatorImpl<M, RM> {
protected abstract OperatorSpec<M, RM> getOperatorSpec();
/**
- * Get the unique name for this {@link OperatorImpl} in the DAG.
+ * Get the unique ID for this {@link OperatorImpl} in the DAG.
*
* Some {@link OperatorImpl}s don't have a 1:1 mapping with their {@link OperatorSpec}. E.g., there are
* 2 PartialJoinOperatorImpls for a JoinOperatorSpec. Overriding this method allows them to provide an
- * implementation specific name, e.g., for use in metrics.
+ * implementation specific id, e.g., for use in metrics.
*
- * @return the unique name for this {@link OperatorImpl} in the DAG
+ * @return the unique ID for this {@link OperatorImpl} in the DAG
*/
- protected String getOperatorName() {
- return getOperatorSpec().getOpName();
+ protected String getOpImplId() {
+ return getOperatorSpec().getOpId();
}
private HighResolutionClock createHighResClock(Config config) {
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 808ddbf..49b29c8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -61,7 +61,7 @@ public class OperatorImplGraph {
private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class);
/**
- * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
+ * A mapping from operator IDs to their {@link OperatorImpl}s in this graph. Used to avoid creating
* multiple {@link OperatorImpl}s for an {@link OperatorSpec} when it's reached from different
* {@link OperatorSpec}s during DAG traversals (e.g., for the merge operator).
* We use a LHM for deterministic ordering in initializing and closing operators.
@@ -74,11 +74,11 @@ public class OperatorImplGraph {
private final Map<SystemStream, InputOperatorImpl> inputOperators = new HashMap<>();
/**
- * A mapping from {@link JoinOperatorSpec}s to their two {@link PartialJoinFunction}s. Used to associate
+ * A mapping from {@link JoinOperatorSpec} IDs to their two {@link PartialJoinFunction}s. Used to associate
* the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're
* reached from different {@link OperatorSpec} during DAG traversals.
*/
- private final Map<Integer, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
+ private final Map<String, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
private final Clock clock;
@@ -155,13 +155,17 @@ public class OperatorImplGraph {
*/
OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
SystemStream inputStream, Config config, TaskContext context) {
- if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) {
+ if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
// Either this is the first time we've seen this operatorSpec, or this is a join operator spec
// and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
operatorImpl.init(config, context);
operatorImpl.registerInputStream(inputStream);
- operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
+
+ // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).
+ // This is currently OK since we don't need to look up a partial join operator impl again during traversal
+ // (a join cannot have a cycle).
+ operatorImpls.put(operatorImpl.getOpImplId(), operatorImpl);
Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
registeredSpecs.forEach(registeredSpec -> {
@@ -172,7 +176,7 @@ public class OperatorImplGraph {
} else {
// the implementation corresponding to operatorSpec has already been instantiated
// and registered, so we do not need to traverse the DAG further.
- return operatorImpls.get(operatorSpec.getOpName());
+ return operatorImpls.get(operatorSpec.getOpId());
}
}
@@ -244,7 +248,7 @@ public class OperatorImplGraph {
@Override
public void init(Config config, TaskContext context) {
- String leftStoreName = joinOpSpec.getLeftOpName();
+ String leftStoreName = joinOpSpec.getLeftOpId();
leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName);
// user-defined joinFn should only be initialized once, so we do it only in left partial join function.
@@ -276,7 +280,7 @@ public class OperatorImplGraph {
@Override
public void init(Config config, TaskContext context) {
- String rightStoreName = joinOpSpec.getRightOpName();
+ String rightStoreName = joinOpSpec.getRightOpId();
rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName);
// user-defined joinFn should only be initialized once,
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index e976a43..90a71a0 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,11 +18,13 @@
*/
package org.apache.samza.operators.impl;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
@@ -68,13 +70,21 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
@Override
public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
- K key = thisPartialJoinFn.getKey(message);
- thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
- TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key);
- long now = clock.currentTimeMillis();
- if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
- JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
- return Collections.singletonList(joinResult);
+ try {
+ KeyValueStore<K, TimestampedValue<M>> thisState = thisPartialJoinFn.getState();
+ KeyValueStore<K, TimestampedValue<OM>> otherState = otherPartialJoinFn.getState();
+
+ K key = thisPartialJoinFn.getKey(message);
+ thisState.put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
+ TimestampedValue<OM> otherMessage = otherState.get(key);
+
+ long now = clock.currentTimeMillis();
+ if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
+ JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
+ return Collections.singletonList(joinResult);
+ }
+ } catch (Exception e) {
+ throw new SamzaException("Error handling message in PartialJoinOperatorImpl " + getOpImplId(), e);
}
return Collections.emptyList();
}
@@ -89,13 +99,13 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
}
/**
- * The name for this {@link PartialJoinOperatorImpl} that includes information about which
+ * The ID for this {@link PartialJoinOperatorImpl} that includes information about which
* side of the join it is for.
*
- * @return the {@link PartialJoinOperatorImpl} name.
+ * @return the {@link PartialJoinOperatorImpl} ID.
*/
@Override
- protected String getOperatorName() {
- return isLeftSide ? joinOpSpec.getLeftOpName() : joinOpSpec.getRightOpName();
+ protected String getOpImplId() {
+ return isLeftSide ? joinOpSpec.getLeftOpId() : joinOpSpec.getRightOpId();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 42fe46a..e0b1240 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -112,7 +112,8 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
protected void handleInit(Config config, TaskContext context) {
WindowInternal<M, K, Object> window = windowOpSpec.getWindow();
- KeyValueStore<TimeSeriesKey<K>, Object> store = (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpName());
+ KeyValueStore<TimeSeriesKey<K>, Object> store =
+ (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId());
// For aggregating windows, we use the store in over-write mode since we only retain the aggregated
// value. Else, we use the store in append-mode.
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 3c66ee6..4c0687a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -38,7 +38,7 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { //
private final boolean isKeyedInput;
public InputOperatorSpec(StreamSpec streamSpec,
- Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, int opId) {
+ Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, String opId) {
super(OpCode.INPUT, opId);
this.streamSpec = streamSpec;
this.keySerde = keySerde;
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 3f99280..c730bca 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -60,7 +60,7 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
*/
JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec,
JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
- long ttlMs, int opId) {
+ long ttlMs, String opId) {
super(OpCode.JOIN, opId);
this.leftInputOpSpec = leftInputOpSpec;
this.rightInputOpSpec = rightInputOpSpec;
@@ -74,8 +74,8 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
@Override
public Collection<StoreDescriptor> getStoreDescriptors() {
String rocksDBStoreFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
- String leftStoreName = getLeftOpName();
- String rightStoreName = getRightOpName();
+ String leftStoreName = getLeftOpId();
+ String rightStoreName = getRightOpId();
Map<String, String> leftStoreCustomProps = ImmutableMap.of(
String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), Long.toString(ttlMs),
String.format("stores.%s.changelog.kafka.cleanup.policy", leftStoreName), "delete",
@@ -105,12 +105,12 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
return rightInputOpSpec;
}
- public String getLeftOpName() {
- return this.getOpName() + "-L";
+ public String getLeftOpId() {
+ return this.getOpId() + "-L";
}
- public String getRightOpName() {
- return this.getOpName() + "-R";
+ public String getRightOpId() {
+ return this.getOpId() + "-R";
}
public JoinFunction<K, M, OM, JM> getJoinFn() {
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 71a9897..17f1b49 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -52,7 +52,7 @@ public abstract class OperatorSpec<M, OM> {
OUTPUT
}
- private final int opId;
+ private final String opId;
private final OpCode opCode;
private StackTraceElement[] creationStackTrace;
@@ -63,7 +63,7 @@ public abstract class OperatorSpec<M, OM> {
*/
private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>();
- public OperatorSpec(OpCode opCode, int opId) {
+ public OperatorSpec(OpCode opCode, String opId) {
this.opCode = opCode;
this.opId = opId;
this.creationStackTrace = Thread.currentThread().getStackTrace();
@@ -93,7 +93,7 @@ public abstract class OperatorSpec<M, OM> {
* Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}.
* @return the unique operator ID
*/
- public final int getOpId() {
+ public final String getOpId() {
return this.opId;
}
@@ -128,13 +128,5 @@ public abstract class OperatorSpec<M, OM> {
return String.format("%s:%s", element.getFileName(), element.getLineNumber());
}
- /**
- * Get the name for this operator based on its opCode and opId.
- * @return the name for this operator
- */
- public final String getOpName() {
- return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
- }
-
abstract public WatermarkFunction getWatermarkFn();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 8b2b177..1b3b8aa 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -28,6 +28,7 @@ import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.task.TaskContext;
import java.util.ArrayList;
@@ -43,6 +44,23 @@ public class OperatorSpecs {
private OperatorSpecs() {}
/**
+ * Creates an {@link InputOperatorSpec} for consuming input.
+ *
+ * @param streamSpec the stream spec for the input stream
+ * @param keySerde the serde for the input key
+ * @param valueSerde the serde for the input value
+ * @param isKeyed whether the input stream is keyed
+ * @param opId the unique ID of the operator
+ * @param <K> type of input key
+ * @param <V> type of input value
+ * @return the {@link InputOperatorSpec}
+ */
+ public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec(
+ StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
+ return new InputOperatorSpec<>(streamSpec, keySerde, valueSerde, isKeyed, opId);
+ }
+
+ /**
* Creates a {@link StreamOperatorSpec} for {@link MapFunction}
*
* @param mapFn the map function
@@ -52,7 +70,7 @@ public class OperatorSpecs {
* @return the {@link StreamOperatorSpec}
*/
public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
- MapFunction<? super M, ? extends OM> mapFn, int opId) {
+ MapFunction<? super M, ? extends OM> mapFn, String opId) {
return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
@Override
public Collection<OM> apply(M message) {
@@ -87,7 +105,7 @@ public class OperatorSpecs {
* @return the {@link StreamOperatorSpec}
*/
public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
- FilterFunction<? super M> filterFn, int opId) {
+ FilterFunction<? super M> filterFn, String opId) {
return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
@Override
public Collection<M> apply(M message) {
@@ -122,7 +140,7 @@ public class OperatorSpecs {
* @return the {@link StreamOperatorSpec}
*/
public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec(
- FlatMapFunction<? super M, ? extends OM> flatMapFn, int opId) {
+ FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) {
return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId);
}
@@ -134,7 +152,7 @@ public class OperatorSpecs {
* @param <M> type of input message
* @return the {@link SinkOperatorSpec} for the sink operator
*/
- public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, int opId) {
+ public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, String opId) {
return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, opId);
}
@@ -146,7 +164,7 @@ public class OperatorSpecs {
* @param <M> the type of message in the {@link OutputStreamImpl}
* @return the {@link OutputOperatorSpec} for the sendTo operator
*/
- public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+ public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, String opId) {
return new OutputOperatorSpec<>(outputStream, opId);
}
@@ -164,7 +182,7 @@ public class OperatorSpecs {
*/
public static <M, K, V> PartitionByOperatorSpec<M, K, V> createPartitionByOperatorSpec(
OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction,
- Function<? super M, ? extends V> valueFunction, int opId) {
+ Function<? super M, ? extends V> valueFunction, String opId) {
return new PartitionByOperatorSpec<>(outputStream, keyFunction, valueFunction, opId);
}
@@ -180,7 +198,7 @@ public class OperatorSpecs {
*/
public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
- WindowInternal<M, WK, WV> window, int opId) {
+ WindowInternal<M, WK, WV> window, String opId) {
return new WindowOperatorSpec<>(window, opId);
}
@@ -203,7 +221,7 @@ public class OperatorSpecs {
*/
public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> createJoinOperatorSpec(
OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn,
- Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, int opId) {
+ Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, String opId) {
return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn,
keySerde, messageSerde, otherMessageSerde, ttlMs, opId);
}
@@ -215,7 +233,7 @@ public class OperatorSpecs {
* @param <M> the type of input message
* @return the {@link StreamOperatorSpec} for the merge
*/
- public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(int opId) {
+ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(String opId) {
return new StreamOperatorSpec<>(message ->
new ArrayList<M>() {
{
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index 862370f..6cb4fca 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -39,7 +39,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
* @param outputStream the {@link OutputStreamImpl} to send messages to
* @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
*/
- OutputOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+ OutputOperatorSpec(OutputStreamImpl<M> outputStream, String opId) {
super(OpCode.SEND_TO, opId);
this.outputStream = outputStream;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index 42eeb4b..399c836 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -51,7 +51,7 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
*/
PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
Function<? super M, ? extends K> keyFunction,
- Function<? super M, ? extends V> valueFunction, int opId) {
+ Function<? super M, ? extends V> valueFunction, String opId) {
super(OpCode.PARTITION_BY, opId);
this.outputStream = outputStream;
this.keyFunction = keyFunction;
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 1145be8..1ca3801 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -41,7 +41,7 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
* {@link org.apache.samza.task.TaskCoordinator}.
* @param opId the unique ID of this {@link OperatorSpec} in the graph
*/
- SinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+ SinkOperatorSpec(SinkFunction<M> sinkFn, String opId) {
super(OpCode.SINK, opId);
this.sinkFn = sinkFn;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index aace2e2..b1e29c6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -39,7 +39,7 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
* @param opCode the {@link OpCode} for this {@link StreamOperatorSpec}
* @param opId the unique ID for this {@link StreamOperatorSpec}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, int opId) {
+ StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, String opId) {
super(opCode, opId);
this.transformFn = transformFn;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 3c8879f..06a4f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -58,7 +58,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
* @param window the window function
* @param opId auto-generated unique ID of this operator
*/
- WindowOperatorSpec(WindowInternal<M, WK, WV> window, int opId) {
+ WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) {
super(OpCode.WINDOW, opId);
this.window = window;
}
@@ -124,7 +124,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
@Override
public Collection<StoreDescriptor> getStoreDescriptors() {
- String storeName = getOpName();
+ String storeName = getOpId();
String storeFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
Serde storeKeySerde = new TimeSeriesKeySerde<>(window.getKeySerde());
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index c75608f..f9e0a3a 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -54,7 +54,7 @@ public class KeyValueStoreExample implements StreamApplication {
pageViewEvents
.partitionBy(pve -> pve.memberId, pve -> pve,
- KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
.map(KV::getValue)
.flatMap(new MyStatsCounter())
.map(stats -> KV.of(stats.memberId, stats))
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index df393b0..ff785d9 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -51,7 +51,7 @@ public class OrderShipmentJoinExample implements StreamApplication {
orders
.join(shipments, new MyJoinFunction(),
new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
- Duration.ofMinutes(1))
+ Duration.ofMinutes(1), "join")
.map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
.sendTo(fulfilledOrders);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index c40de7b..846b9f8 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -56,7 +56,7 @@ public class PageViewCounterExample implements StreamApplication {
pageViewEvents
.window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
- .setAccumulationMode(AccumulationMode.DISCARDING))
+ .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
.sendTo(pageViewEventPerMemberStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index c403406..c9bcc45 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -49,8 +49,9 @@ public class RepartitionExample implements StreamApplication {
pageViewEvents
.partitionBy(pve -> pve.memberId, pve -> pve,
- KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
- .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null))
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
+ .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null),
+ "window")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
.sendTo(pageViewEventPerMember);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
index 9381e49..3c37c31 100644
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -54,8 +54,9 @@ public class WindowExample implements StreamApplication {
// also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
// for 1 minute.
inputStream
- .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, null)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+ .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde())
+ .setLateTrigger(Triggers.any(Triggers.count(30000),
+ Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
.map(WindowPane::getMessage)
.sendTo(outputStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index ab20bba..0a3e9c8 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -111,7 +111,7 @@ public class TestExecutionPlanner {
MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1");
OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
input1
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
.map(kv -> kv)
.sendTo(output1);
return streamGraph;
@@ -136,23 +136,23 @@ public class TestExecutionPlanner {
.map(m -> m);
MessageStream<KV<Object, Object>> messageStream2 =
streamGraph.<KV<Object, Object>>getInputStream("input2")
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
.filter(m -> true);
MessageStream<KV<Object, Object>> messageStream3 =
streamGraph.<KV<Object, Object>>getInputStream("input3")
.filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p2")
.map(m -> m);
OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
messageStream1
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
.sendTo(output1);
messageStream3
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
.sendTo(output2);
return streamGraph;
@@ -166,35 +166,37 @@ public class TestExecutionPlanner {
.map(m -> m);
MessageStream<KV<Object, Object>> messageStream2 =
streamGraph.<KV<Object, Object>>getInputStream("input2")
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
.filter(m -> true);
MessageStream<KV<Object, Object>> messageStream3 =
streamGraph.<KV<Object, Object>>getInputStream("input3")
.filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p2")
.map(m -> m);
OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
messageStream1.map(m -> m)
- .filter(m -> true)
- .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)));
+ .filter(m->true)
+ .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8),
+ mock(Serde.class), mock(Serde.class)), "w1");
messageStream2.map(m -> m)
- .filter(m -> true)
- .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)));
+ .filter(m->true)
+ .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16),
+ mock(Serde.class), mock(Serde.class)), "w2");
messageStream1
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
.sendTo(output1);
messageStream3
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
.sendTo(output2);
messageStream3
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
.sendTo(output2);
return streamGraph;
@@ -242,12 +244,12 @@ public class TestExecutionPlanner {
when(runner.getStreamSpec("output2")).thenReturn(output2);
// intermediate streams used in tests
- when(runner.getStreamSpec("test-app-1-partition_by-1"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
- when(runner.getStreamSpec("test-app-1-partition_by-3"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
- when(runner.getStreamSpec("test-app-1-partition_by-8"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-p1"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-p2"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-p3"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-p3", "test-app-1-partition_by-p3", "default-system"));
}
@Test
@@ -429,7 +431,7 @@ public class TestExecutionPlanner {
MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4");
OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
- input1.partitionBy(m -> m.key, m -> m.value).map(kv -> kv).sendTo(output1);
+ input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1);
JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
// the partitions should be the same as input1
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 10c4aa3..ba5c922 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -85,10 +85,10 @@ public class TestJobGraphJsonGenerator {
when(runner.getStreamSpec("output2")).thenReturn(output2);
// intermediate streams used in tests
- when(runner.getStreamSpec("test-app-1-partition_by-3"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
- when(runner.getStreamSpec("test-app-1-partition_by-8"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-p1"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-p2"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"));
// set up external partition count
Map<String, Integer> system1Map = new HashMap<>();
@@ -113,24 +113,24 @@ public class TestJobGraphJsonGenerator {
.map(m -> m);
MessageStream<KV<Object, Object>> messageStream2 =
streamGraph.<KV<Object, Object>>getInputStream("input2")
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
.filter(m -> true);
MessageStream<KV<Object, Object>> messageStream3 =
streamGraph.<KV<Object, Object>>getInputStream("input3")
.filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value)
+ .partitionBy(m -> m.key, m -> m.value, "p2")
.map(m -> m);
OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
messageStream1
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
.sendTo(outputStream1);
messageStream2.sink((message, collector, coordinator) -> { });
messageStream3
.join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
.sendTo(outputStream2);
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index f6ebaf9..53e8bf6 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -20,6 +20,7 @@
package org.apache.samza.execution;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.operators.KV;
@@ -45,8 +46,11 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestJobNode {
@@ -56,25 +60,31 @@ public class TestJobNode {
StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
- StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", "partition_by-2", "intermediate-system");
+ StreamSpec partitionBySpec =
+ new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system");
doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
- doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2");
+ doReturn(partitionBySpec).when(mockRunner).getStreamSpec("jobName-jobId-partition_by-p1");
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
+
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1");
MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2");
OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
input1
- .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value)
+ .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value)
.join(input2.map(kv -> kv.value), mockJoinFn,
- new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1))
+ new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
+ Duration.ofHours(1), "j1")
.sendTo(output);
- JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
+ JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mockConfig);
Config config = new MapConfig();
StreamEdge input1Edge = new StreamEdge(input1Spec, config);
StreamEdge input2Edge = new StreamEdge(input2Spec, config);
@@ -133,8 +143,8 @@ public class TestJobNode {
assertTrue("Serialized output msg serde should be a StringSerde",
outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
- String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-2.samza.key.serde");
- String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde");
+ String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
+ String partitionByMsgSerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
assertTrue("Serialized serdes should contain intermediate stream key serde",
deserializedSerdes.containsKey(partitionByKeySerde));
assertTrue("Serialized intermediate stream key serde should be a StringSerde",
@@ -145,8 +155,8 @@ public class TestJobNode {
"Serialized intermediate stream msg serde should be a StringSerde",
partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
- String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde");
- String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde");
+ String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde");
+ String leftJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde");
assertTrue("Serialized serdes should contain left join store key serde",
deserializedSerdes.containsKey(leftJoinStoreKeySerde));
assertTrue("Serialized left join store key serde should be a StringSerde",
@@ -156,8 +166,8 @@ public class TestJobNode {
assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde",
leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
- String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde");
- String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde");
+ String rightJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde");
+ String rightJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde");
assertTrue("Serialized serdes should contain right join store key serde",
deserializedSerdes.containsKey(rightJoinStoreKeySerde));
assertTrue("Serialized right join store key serde should be a StringSerde",
http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 09fb56a..dac4e94 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -20,8 +20,10 @@ package org.apache.samza.operators;
import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.functions.JoinFunction;
@@ -51,6 +53,7 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -77,6 +80,22 @@ public class TestJoinOperator {
assertEquals(110, outputSum);
}
+ @Test(expected = SamzaException.class)
+ public void joinWithSelfThrowsException() throws Exception {
+ StreamApplication app = new StreamApplication() {
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ IntegerSerde integerSerde = new IntegerSerde();
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+ MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde);
+
+ inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join");
+ }
+ };
+
+ createStreamOperatorTask(new SystemClock(), app); // should throw an exception
+ }
+
@Test
public void joinFnInitAndClose() throws Exception {
TestJoinFunction joinFn = new TestJoinFunction();
@@ -277,10 +296,14 @@ public class TestJoinOperator {
// need to return different stores for left and right side
IntegerSerde integerSerde = new IntegerSerde();
TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
- when(taskContext.getStore(eq("join-2-L"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
- when(taskContext.getStore(eq("join-2-R"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
+ when(taskContext.getStore(eq("jobName-jobId-join-j1-L")))
+ .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
+ when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
+ .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
Config config = mock(Config.class);
+ when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+ when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock);
sot.init(config, taskContext);
@@ -304,10 +327,8 @@ public class TestJoinOperator {
SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
inStream
- .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL)
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
+ .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1")
+ .sink((m, mc, tc) -> mc.send(new OutgoingMessageEnvelope(outputSystemStream, m)));
}
}