You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/26 00:18:41 UTC

[2/2] samza git commit: SAMZA-1454: Globally unique and user settable IDs for stateful operators

SAMZA-1454: Globally unique and user settable IDs for stateful operators

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish Venkatraman <ja...@apache.org>, Yi Pan <ni...@gmail.com>

Closes #324 from prateekm/operator-id-uniqueness


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1296c7ff
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1296c7ff
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1296c7ff

Branch: refs/heads/master
Commit: 1296c7ff91daff4abeac392cec53600f7c5cb427
Parents: 711dd8d
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Oct 25 17:18:36 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 25 17:18:36 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/operators/MessageStream.java   | 41 +++++++----
 .../samza/execution/JobGraphJsonGenerator.java  |  4 +-
 .../samza/operators/MessageStreamImpl.java      | 44 +++++++-----
 .../apache/samza/operators/StreamGraphImpl.java | 74 +++++++++++++++-----
 .../samza/operators/impl/OperatorImpl.java      | 32 ++++-----
 .../samza/operators/impl/OperatorImplGraph.java | 20 +++---
 .../operators/impl/PartialJoinOperatorImpl.java | 32 ++++++---
 .../operators/impl/WindowOperatorImpl.java      |  3 +-
 .../samza/operators/spec/InputOperatorSpec.java |  2 +-
 .../samza/operators/spec/JoinOperatorSpec.java  | 14 ++--
 .../samza/operators/spec/OperatorSpec.java      | 14 +---
 .../samza/operators/spec/OperatorSpecs.java     | 36 +++++++---
 .../operators/spec/OutputOperatorSpec.java      |  2 +-
 .../operators/spec/PartitionByOperatorSpec.java |  2 +-
 .../samza/operators/spec/SinkOperatorSpec.java  |  2 +-
 .../operators/spec/StreamOperatorSpec.java      |  2 +-
 .../operators/spec/WindowOperatorSpec.java      |  4 +-
 .../samza/example/KeyValueStoreExample.java     |  2 +-
 .../samza/example/OrderShipmentJoinExample.java |  2 +-
 .../samza/example/PageViewCounterExample.java   |  2 +-
 .../samza/example/RepartitionExample.java       |  5 +-
 .../org/apache/samza/example/WindowExample.java |  5 +-
 .../samza/execution/TestExecutionPlanner.java   | 44 ++++++------
 .../execution/TestJobGraphJsonGenerator.java    | 16 ++---
 .../org/apache/samza/execution/TestJobNode.java | 34 +++++----
 .../samza/operators/TestJoinOperator.java       | 33 +++++++--
 .../samza/operators/TestMessageStreamImpl.java  | 26 +++----
 .../samza/operators/TestStreamGraphImpl.java    | 61 ++++++++++------
 .../samza/operators/TestWindowOperator.java     | 46 ++++++++----
 .../samza/operators/impl/TestOperatorImpl.java  |  2 +-
 .../operators/impl/TestOperatorImplGraph.java   | 47 ++++++++-----
 .../operators/spec/TestWindowOperatorSpec.java  |  4 +-
 .../EndOfStreamIntegrationTest.java             |  2 +-
 .../WatermarkIntegrationTest.java               |  2 +-
 .../test/operator/RepartitionJoinWindowApp.java | 13 ++--
 .../samza/test/operator/SessionWindowApp.java   |  4 +-
 .../samza/test/operator/TumblingWindowApp.java  |  3 +-
 37 files changed, 425 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index c36fe1f..dcce7c8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -73,7 +73,7 @@ public interface MessageStream<M> {
    * should be retained in the filtered {@link MessageStream}.
    *
    * @param filterFn the predicate to filter messages from this {@link MessageStream}.
-   * @return the transformed {@link MessageStream}
+   * @return the filtered {@link MessageStream}
    */
   MessageStream<M> filter(FilterFunction<? super M> filterFn);
 
@@ -105,15 +105,19 @@ public interface MessageStream<M> {
    * <p>
    * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
    * <p>
-   * <b>Warning:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
+   * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
+   * for any state stores and streams created by this operator (the full ID also contains the job name, job id and
+   * operator type). If the application logic is changed, this ID must be reused in the new operator to retain
+   * state from the previous version, and changed for the new operator to discard the state from the previous version.
    *
    * @param window the window to group and process messages from this {@link MessageStream}
+   * @param id the unique id of this operator in this application
    * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
    *            panes are emitted per-key.
    * @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
-   * @return the transformed {@link MessageStream}
+   * @return the windowed {@link MessageStream}
    */
-  <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
+  <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String id);
 
   /**
    * Joins this {@link MessageStream} with another {@link MessageStream} using the provided
@@ -124,14 +128,18 @@ public interface MessageStream<M> {
    * <p>
    * Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
    * <p>
-   * <b>Warning:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
+   * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
+   * for any state stores and streams created by this operator (the full ID also contains the job name, job id and
+   * operator type). If the application logic is changed, this ID must be reused in the new operator to retain
+   * state from the previous version, and changed for the new operator to discard the state from the previous version.
    *
    * @param otherStream the other {@link MessageStream} to be joined with
    * @param joinFn the function to join messages from this and the other {@link MessageStream}
-   * @param ttl the ttl for messages in each stream
    * @param keySerde the serde for the join key
    * @param messageSerde the serde for messages in this stream
    * @param otherMessageSerde the serde for messages in the other stream
+   * @param ttl the ttl for messages in each stream
+   * @param id the unique id of this operator in this application
    * @param <K> the type of join key
    * @param <OM> the type of messages in the other stream
    * @param <JM> the type of messages resulting from the {@code joinFn}
@@ -139,7 +147,8 @@ public interface MessageStream<M> {
    */
   <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
       JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
-      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl);
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
+      Duration ttl, String id);
 
   /**
    * Merges all {@code otherStreams} with this {@link MessageStream}.
@@ -186,26 +195,34 @@ public interface MessageStream<M> {
    * configuration, if present.
    * Else, the number of partitions is set to to the max of number of partitions for all input and output streams
    * (excluding intermediate streams).
+   * <p>
+   * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
+   * for any state stores and streams created by this operator (the full ID also contains the job name, job id and
+   * operator type). If the application logic is changed, this ID must be reused in the new operator to retain
+   * state from the previous version, and changed for the new operator to discard the state from the previous version.
    *
-   * @param <K> the type of output key
-   * @param <V> the type of output value
    * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
    * @param valueExtractor the {@link Function} to extract the value from the input message
    * @param serde the {@link KVSerde} to use for (de)serializing the key and value.
+   * @param id the unique id of this operator in this application
+   * @param <K> the type of output key
+   * @param <V> the type of output value
    * @return the repartitioned {@link MessageStream}
    */
   <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde);
+      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
+
 
   /**
-   * Same as calling {@link #partitionBy(Function, Function, KVSerde)} with a null KVSerde.
+   * Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde.
    *
    * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
    * @param valueExtractor the {@link Function} to extract the value from the input message
+   * @param id the unique id of this operator in this application
    * @param <K> the type of output key
    * @param <V> the type of output value
    * @return the repartitioned {@link MessageStream}
    */
   <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor);
+      Function<? super M, ? extends V> valueExtractor, String id);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 23c9d89..03845e3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -69,14 +69,14 @@ import org.codehaus.jackson.map.ObjectMapper;
     @JsonProperty("outputStreams")
     List<StreamJson> outputStreams;
     @JsonProperty("operators")
-    Map<Integer, Map<String, Object>> operators = new HashMap<>();
+    Map<String, Map<String, Object>> operators = new HashMap<>();
   }
 
   static final class StreamJson {
     @JsonProperty("streamId")
     String streamId;
     @JsonProperty("nextOperatorIds")
-    Set<Integer>  nextOperatorIds = new HashSet<>();
+    Set<String>  nextOperatorIds = new HashSet<>();
   }
 
   static final class JobNodeJson {

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index dc91d19..e6e711c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.operators;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -26,6 +27,7 @@ import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -72,42 +74,47 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
-    OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, this.graph.getNextOpId());
+    String opId = this.graph.getNextOpId(OpCode.MAP);
+    OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
-    OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, this.graph.getNextOpId());
+    String opId = this.graph.getNextOpId(OpCode.FILTER);
+    OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
-    OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, this.graph.getNextOpId());
+    String opId = this.graph.getNextOpId(OpCode.FLAT_MAP);
+    OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public void sink(SinkFunction<? super M> sinkFn) {
-    SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
+    String opId = this.graph.getNextOpId(OpCode.SINK);
+    SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 
   @Override
   public void sendTo(OutputStream<M> outputStream) {
-    OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
-        (OutputStreamImpl<M>) outputStream, this.graph.getNextOpId());
+    String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+    OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl<M>) outputStream, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 
   @Override
-  public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
+  public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
+    String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId);
     OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec(
-        (WindowInternal<M, K, WV>) window, this.graph.getNextOpId());
+        (WindowInternal<M, K, WV>) window, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
@@ -115,11 +122,14 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   @Override
   public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
       JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
-      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) {
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
+      Duration ttl, String userDefinedId) {
+    if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
     OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
+    String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
     JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
         OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
-            keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId());
+            keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId);
 
     this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
     otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
@@ -130,7 +140,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   @Override
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
     if (otherStreams.isEmpty()) return this;
-    StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId());
+    String opId = this.graph.getNextOpId(OpCode.MERGE);
+    StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(opId);
     this.operatorSpec.registerNextOperatorSpec(opSpec);
     otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec));
     return new MessageStreamImpl<>(this.graph, opSpec);
@@ -138,10 +149,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde) {
-    int opId = this.graph.getNextOpId();
-    String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
-    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opName, serde);
+      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
+    String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
+    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde);
     PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec =
         OperatorSpecs.createPartitionByOperatorSpec(
             intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
@@ -151,8 +161,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   @Override
   public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor) {
-    return partitionBy(keyExtractor, valueExtractor, null);
+      Function<? super M, ? extends V> valueExtractor, String userDefinedId) {
+    return partitionBy(keyExtractor, valueExtractor, null, userDefinedId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index a02ed3e..936cb3a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -19,10 +19,14 @@
 package org.apache.samza.operators;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.runtime.ApplicationRunner;
@@ -48,18 +52,19 @@ import java.util.stream.Collectors;
 public class StreamGraphImpl implements StreamGraph {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
 
-  /**
-   * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
-   * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}.
-   */
-  private int opId = 0;
-
   // We use a LHM for deterministic order in initializing and closing operators.
   private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
   private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
+  /**
+   * The 0-based position of the next operator in the graph.
+   * Part of the unique ID for each OperatorSpec in the graph.
+   * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
+   */
+  private int nextOpNum = 0;
+  private final Set<String> operatorIds = new HashSet<>();
   private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde());
   private ContextManager contextManager = null;
 
@@ -81,6 +86,7 @@ public class StreamGraphImpl implements StreamGraph {
   @Override
   public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
     Preconditions.checkState(!inputOperators.containsKey(streamSpec),
         "getInputStream must not be called multiple times with the same streamId: " + streamId);
@@ -96,8 +102,11 @@ public class StreamGraphImpl implements StreamGraph {
     }
 
     boolean isKeyed = serde instanceof KVSerde;
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
+            isKeyed, this.getNextOpId(OpCode.INPUT, null));
     inputOperators.put(streamSpec,
-        new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
+        inputOperatorSpec);
     return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
   }
 
@@ -109,6 +118,7 @@ public class StreamGraphImpl implements StreamGraph {
   @Override
   public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
     Preconditions.checkState(!outputStreams.containsKey(streamSpec),
         "getOutputStream must not be called multiple times with the same streamId: " + streamId);
@@ -144,20 +154,15 @@ public class StreamGraphImpl implements StreamGraph {
    * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
    * An intermediate {@link MessageStream} is both an output and an input stream.
    *
-   * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
-   *                   logical streamId.
+   * @param streamId the id of the stream to be created.
    * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
    *              is used.
    * @param <M> the type of messages in the intermediate {@link MessageStream}
    * @return  the intermediate {@link MessageStreamImpl}
    */
-  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamName, Serde<M> serde) {
-    String streamId = String.format("%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
-        streamName);
+  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) {
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
-
+    
     Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
         "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
 
@@ -168,8 +173,10 @@ public class StreamGraphImpl implements StreamGraph {
 
     boolean isKeyed = serde instanceof KVSerde;
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    inputOperators.put(streamSpec,
-        new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
+            isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamSpec, inputOperatorSpec);
     outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
     return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
   }
@@ -186,8 +193,37 @@ public class StreamGraphImpl implements StreamGraph {
     return this.contextManager;
   }
 
-  /* package private */ int getNextOpId() {
-    return this.opId++;
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @param userDefinedId the optional user-provided name of the next operator or null
+   * @return the unique ID for the next operator in the graph
+   */
+  /* package private */ String getNextOpId(OpCode opCode, String userDefinedId) {
+    String nextOpId = String.format("%s-%s-%s-%s",
+        config.get(JobConfig.JOB_NAME()),
+        config.get(JobConfig.JOB_ID(), "1"),
+        opCode.name().toLowerCase(),
+        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
+    if (!operatorIds.add(nextOpId)) {
+      throw new SamzaException(
+          String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
+    }
+    nextOpNum++;
+    return nextOpId;
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+   * jobName-jobId-opCode-nextOpNum;
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @return the unique ID for the next operator in the graph
+   */
+  /* package private */ String getNextOpId(OpCode opCode) {
+    return getNextOpId(opCode, null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 0c50630..f5a2624 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -81,14 +81,14 @@ public abstract class OperatorImpl<M, RM> {
    * @param context  the {@link TaskContext} for the task
    */
   public final void init(Config config, TaskContext context) {
-    String opName = getOperatorName();
+    String opId = getOpImplId();
 
     if (initialized) {
-      throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
+      throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opId));
     }
 
     if (closed) {
-      throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opName));
+      throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opId));
     }
 
     this.highResClock = createHighResClock(config);
@@ -96,9 +96,9 @@ public abstract class OperatorImpl<M, RM> {
     prevOperators = new HashSet<>();
     inputStreams = new HashSet<>();
     MetricsRegistry metricsRegistry = context.getMetricsRegistry();
-    this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
-    this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
-    this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
+    this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + "-messages");
+    this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns");
+    this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns");
     this.taskName = context.getTaskName();
 
     TaskContextImpl taskContext = (TaskContextImpl) context;
@@ -127,7 +127,7 @@ public abstract class OperatorImpl<M, RM> {
     if (!initialized) {
       throw new IllegalStateException(
           String.format("Attempted to register next operator before initializing operator %s.",
-              getOperatorName()));
+              getOpImplId()));
     }
     this.registeredOperators.add(nextOperator);
     nextOperator.registerPrevOperator(this);
@@ -163,7 +163,7 @@ public abstract class OperatorImpl<M, RM> {
           String.format("Error applying operator %s (created at %s) to its input message. "
                   + "Expected input message to be of type %s, but found it to be of type %s. "
                   + "Are Serdes for the inputs to this operator configured correctly?",
-              getOperatorName(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
+              getOpImplId(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
     }
 
     long endNs = this.highResClock.nanoTime();
@@ -317,7 +317,7 @@ public abstract class OperatorImpl<M, RM> {
     if (inputWatermark < inputWatermarkMin) {
       // advance the watermark time of this operator
       inputWatermark = inputWatermarkMin;
-      LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOperatorName());
+      LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOpImplId());
 
       final Long outputWm;
       WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
@@ -340,7 +340,7 @@ public abstract class OperatorImpl<M, RM> {
       if (outputWatermark < outputWm) {
         // advance the watermark
         outputWatermark = outputWm;
-        LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOperatorName());
+        LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOpImplId());
         this.registeredOperators.forEach(op -> op.onWatermark(outputWatermark, collector, coordinator));
       } else if (outputWatermark > outputWm) {
         LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", outputWm, outputWatermark);
@@ -375,7 +375,7 @@ public abstract class OperatorImpl<M, RM> {
   public void close() {
     if (closed) {
       throw new IllegalStateException(
-          String.format("Attempted to close Operator %s more than once.", getOperatorSpec().getOpName()));
+          String.format("Attempted to close Operator %s more than once.", getOpImplId()));
     }
     handleClose();
     closed = true;
@@ -391,16 +391,16 @@ public abstract class OperatorImpl<M, RM> {
   protected abstract OperatorSpec<M, RM> getOperatorSpec();
 
   /**
-   * Get the unique name for this {@link OperatorImpl} in the DAG.
+   * Get the unique ID for this {@link OperatorImpl} in the DAG.
    *
    * Some {@link OperatorImpl}s don't have a 1:1 mapping with their {@link OperatorSpec}. E.g., there are
    * 2 PartialJoinOperatorImpls for a JoinOperatorSpec. Overriding this method allows them to provide an
-   * implementation specific name, e.g., for use in metrics.
+   * implementation specific id, e.g., for use in metrics.
    *
-   * @return the unique name for this {@link OperatorImpl} in the DAG
+   * @return the unique ID for this {@link OperatorImpl} in the DAG
    */
-  protected String getOperatorName() {
-    return getOperatorSpec().getOpName();
+  protected String getOpImplId() {
+    return getOperatorSpec().getOpId();
   }
 
   private HighResolutionClock createHighResClock(Config config) {

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 808ddbf..49b29c8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -61,7 +61,7 @@ public class OperatorImplGraph {
   private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class);
 
   /**
-   * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
+   * A mapping from operator IDs to their {@link OperatorImpl}s in this graph. Used to avoid creating
    * multiple {@link OperatorImpl}s for an {@link OperatorSpec} when it's reached from different
    * {@link OperatorSpec}s during DAG traversals (e.g., for the merge operator).
    * We use a LHM for deterministic ordering in initializing and closing operators.
@@ -74,11 +74,11 @@ public class OperatorImplGraph {
   private final Map<SystemStream, InputOperatorImpl> inputOperators = new HashMap<>();
 
   /**
-   * A mapping from {@link JoinOperatorSpec}s to their two {@link PartialJoinFunction}s. Used to associate
+   * A mapping from {@link JoinOperatorSpec} IDs to their two {@link PartialJoinFunction}s. Used to associate
    * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're
    * reached from different {@link OperatorSpec} during DAG traversals.
    */
-  private final Map<Integer, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
+  private final Map<String, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
 
   private final Clock clock;
 
@@ -155,13 +155,17 @@ public class OperatorImplGraph {
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       SystemStream inputStream, Config config, TaskContext context) {
-    if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) {
+    if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
       OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
       operatorImpl.init(config, context);
       operatorImpl.registerInputStream(inputStream);
-      operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
+
+      // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).
+      // This is currently OK since we don't need to look up a partial join operator impl again during traversal
+      // (a join cannot have a cycle).
+      operatorImpls.put(operatorImpl.getOpImplId(), operatorImpl);
 
       Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
       registeredSpecs.forEach(registeredSpec -> {
@@ -172,7 +176,7 @@ public class OperatorImplGraph {
     } else {
       // the implementation corresponding to operatorSpec has already been instantiated
       // and registered, so we do not need to traverse the DAG further.
-      return operatorImpls.get(operatorSpec.getOpName());
+      return operatorImpls.get(operatorSpec.getOpId());
     }
   }
 
@@ -244,7 +248,7 @@ public class OperatorImplGraph {
 
       @Override
       public void init(Config config, TaskContext context) {
-        String leftStoreName = joinOpSpec.getLeftOpName();
+        String leftStoreName = joinOpSpec.getLeftOpId();
         leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName);
 
         // user-defined joinFn should only be initialized once, so we do it only in left partial join function.
@@ -276,7 +280,7 @@ public class OperatorImplGraph {
 
       @Override
       public void init(Config config, TaskContext context) {
-        String rightStoreName = joinOpSpec.getRightOpName();
+        String rightStoreName = joinOpSpec.getRightOpId();
         rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName);
 
         // user-defined joinFn should only be initialized once,

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index e976a43..90a71a0 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,11 +18,13 @@
  */
 package org.apache.samza.operators.impl;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -68,13 +70,21 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
 
   @Override
   public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
-    K key = thisPartialJoinFn.getKey(message);
-    thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
-    TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key);
-    long now = clock.currentTimeMillis();
-    if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
-      JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
-      return Collections.singletonList(joinResult);
+    try {
+      KeyValueStore<K, TimestampedValue<M>> thisState = thisPartialJoinFn.getState();
+      KeyValueStore<K, TimestampedValue<OM>> otherState = otherPartialJoinFn.getState();
+
+      K key = thisPartialJoinFn.getKey(message);
+      thisState.put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
+      TimestampedValue<OM> otherMessage = otherState.get(key);
+
+      long now = clock.currentTimeMillis();
+      if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
+        JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
+        return Collections.singletonList(joinResult);
+      }
+    } catch (Exception e) {
+      throw new SamzaException("Error handling message in PartialJoinOperatorImpl " + getOpImplId(), e);
     }
     return Collections.emptyList();
   }
@@ -89,13 +99,13 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
   }
 
   /**
-   * The name for this {@link PartialJoinOperatorImpl} that includes information about which
+   * The ID for this {@link PartialJoinOperatorImpl} that includes information about which
    * side of the join it is for.
    *
-   * @return the {@link PartialJoinOperatorImpl} name.
+   * @return the {@link PartialJoinOperatorImpl} ID.
    */
   @Override
-  protected String getOperatorName() {
-    return isLeftSide ? joinOpSpec.getLeftOpName() : joinOpSpec.getRightOpName();
+  protected String getOpImplId() {
+    return isLeftSide ? joinOpSpec.getLeftOpId() : joinOpSpec.getRightOpId();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 42fe46a..e0b1240 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -112,7 +112,8 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
   protected void handleInit(Config config, TaskContext context) {
     WindowInternal<M, K, Object> window = windowOpSpec.getWindow();
 
-    KeyValueStore<TimeSeriesKey<K>, Object> store = (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpName());
+    KeyValueStore<TimeSeriesKey<K>, Object> store =
+        (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId());
 
     // For aggregating windows, we use the store in over-write mode since we only retain the aggregated
     // value. Else, we use the store in append-mode.

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 3c66ee6..4c0687a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -38,7 +38,7 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { //
   private final boolean isKeyedInput;
 
   public InputOperatorSpec(StreamSpec streamSpec,
-      Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, int opId) {
+      Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, String opId) {
     super(OpCode.INPUT, opId);
     this.streamSpec = streamSpec;
     this.keySerde = keySerde;

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 3f99280..c730bca 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -60,7 +60,7 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
    */
   JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec,
       JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
-      long ttlMs, int opId) {
+      long ttlMs, String opId) {
     super(OpCode.JOIN, opId);
     this.leftInputOpSpec = leftInputOpSpec;
     this.rightInputOpSpec = rightInputOpSpec;
@@ -74,8 +74,8 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
   @Override
   public Collection<StoreDescriptor> getStoreDescriptors() {
     String rocksDBStoreFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
-    String leftStoreName = getLeftOpName();
-    String rightStoreName = getRightOpName();
+    String leftStoreName = getLeftOpId();
+    String rightStoreName = getRightOpId();
     Map<String, String> leftStoreCustomProps = ImmutableMap.of(
         String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), Long.toString(ttlMs),
         String.format("stores.%s.changelog.kafka.cleanup.policy", leftStoreName), "delete",
@@ -105,12 +105,12 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
     return rightInputOpSpec;
   }
 
-  public String getLeftOpName() {
-    return this.getOpName() + "-L";
+  public String getLeftOpId() {
+    return this.getOpId() + "-L";
   }
 
-  public String getRightOpName() {
-    return this.getOpName() + "-R";
+  public String getRightOpId() {
+    return this.getOpId() + "-R";
   }
 
   public JoinFunction<K, M, OM, JM> getJoinFn() {

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 71a9897..17f1b49 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -52,7 +52,7 @@ public abstract class OperatorSpec<M, OM> {
     OUTPUT
   }
 
-  private final int opId;
+  private final String opId;
   private final OpCode opCode;
   private StackTraceElement[] creationStackTrace;
 
@@ -63,7 +63,7 @@ public abstract class OperatorSpec<M, OM> {
    */
   private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>();
 
-  public OperatorSpec(OpCode opCode, int opId) {
+  public OperatorSpec(OpCode opCode, String opId) {
     this.opCode = opCode;
     this.opId = opId;
     this.creationStackTrace = Thread.currentThread().getStackTrace();
@@ -93,7 +93,7 @@ public abstract class OperatorSpec<M, OM> {
    * Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}.
    * @return  the unique operator ID
    */
-  public final int getOpId() {
+  public final String getOpId() {
     return this.opId;
   }
 
@@ -128,13 +128,5 @@ public abstract class OperatorSpec<M, OM> {
     return String.format("%s:%s", element.getFileName(), element.getLineNumber());
   }
 
-  /**
-   * Get the name for this operator based on its opCode and opId.
-   * @return  the name for this operator
-   */
-  public final String getOpName() {
-    return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
-  }
-
   abstract public WatermarkFunction getWatermarkFn();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 8b2b177..1b3b8aa 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -28,6 +28,7 @@ import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.TaskContext;
 
 import java.util.ArrayList;
@@ -43,6 +44,23 @@ public class OperatorSpecs {
   private OperatorSpecs() {}
 
   /**
+   * Creates an {@link InputOperatorSpec} for consuming input.
+   *
+   * @param streamSpec  the stream spec for the input stream
+   * @param keySerde  the serde for the input key
+   * @param valueSerde  the serde for the input value
+   * @param isKeyed  whether the input stream is keyed
+   * @param opId  the unique ID of the operator
+   * @param <K>  type of input key
+   * @param <V>  type of input value
+   * @return  the {@link InputOperatorSpec}
+   */
+  public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec(
+    StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
+    return new InputOperatorSpec<>(streamSpec, keySerde, valueSerde, isKeyed, opId);
+  }
+
+  /**
    * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
    *
    * @param mapFn  the map function
@@ -52,7 +70,7 @@ public class OperatorSpecs {
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
-      MapFunction<? super M, ? extends OM> mapFn, int opId) {
+      MapFunction<? super M, ? extends OM> mapFn, String opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
       @Override
       public Collection<OM> apply(M message) {
@@ -87,7 +105,7 @@ public class OperatorSpecs {
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
-      FilterFunction<? super M> filterFn, int opId) {
+      FilterFunction<? super M> filterFn, String opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
       @Override
       public Collection<M> apply(M message) {
@@ -122,7 +140,7 @@ public class OperatorSpecs {
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec(
-      FlatMapFunction<? super M, ? extends OM> flatMapFn, int opId) {
+      FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) {
     return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId);
   }
 
@@ -134,7 +152,7 @@ public class OperatorSpecs {
    * @param <M>  type of input message
    * @return  the {@link SinkOperatorSpec} for the sink operator
    */
-  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, int opId) {
+  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, String opId) {
     return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, opId);
   }
 
@@ -146,7 +164,7 @@ public class OperatorSpecs {
    * @param <M> the type of message in the {@link OutputStreamImpl}
    * @return  the {@link OutputOperatorSpec} for the sendTo operator
    */
-  public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+  public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, String opId) {
     return new OutputOperatorSpec<>(outputStream, opId);
   }
 
@@ -164,7 +182,7 @@ public class OperatorSpecs {
    */
   public static <M, K, V> PartitionByOperatorSpec<M, K, V> createPartitionByOperatorSpec(
       OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction,
-      Function<? super M, ? extends V> valueFunction, int opId) {
+      Function<? super M, ? extends V> valueFunction, String opId) {
     return new PartitionByOperatorSpec<>(outputStream, keyFunction, valueFunction, opId);
   }
 
@@ -180,7 +198,7 @@ public class OperatorSpecs {
    */
 
   public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
-      WindowInternal<M, WK, WV> window, int opId) {
+      WindowInternal<M, WK, WV> window, String opId) {
     return new WindowOperatorSpec<>(window, opId);
   }
 
@@ -203,7 +221,7 @@ public class OperatorSpecs {
    */
   public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> createJoinOperatorSpec(
       OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn,
-      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, int opId) {
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, String opId) {
     return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn,
         keySerde, messageSerde, otherMessageSerde, ttlMs, opId);
   }
@@ -215,7 +233,7 @@ public class OperatorSpecs {
    * @param <M>  the type of input message
    * @return  the {@link StreamOperatorSpec} for the merge
    */
-  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(int opId) {
+  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(String opId) {
     return new StreamOperatorSpec<>(message ->
         new ArrayList<M>() {
           {

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index 862370f..6cb4fca 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -39,7 +39,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
    * @param outputStream  the {@link OutputStreamImpl} to send messages to
    * @param opId  the unique ID of this {@link SinkOperatorSpec} in the graph
    */
-  OutputOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+  OutputOperatorSpec(OutputStreamImpl<M> outputStream, String opId) {
     super(OpCode.SEND_TO, opId);
     this.outputStream = outputStream;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index 42eeb4b..399c836 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -51,7 +51,7 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
    */
   PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
       Function<? super M, ? extends K> keyFunction,
-      Function<? super M, ? extends V> valueFunction, int opId) {
+      Function<? super M, ? extends V> valueFunction, String opId) {
     super(OpCode.PARTITION_BY, opId);
     this.outputStream = outputStream;
     this.keyFunction = keyFunction;

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 1145be8..1ca3801 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -41,7 +41,7 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
    *                {@link org.apache.samza.task.TaskCoordinator}.
    * @param opId  the unique ID of this {@link OperatorSpec} in the graph
    */
-  SinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+  SinkOperatorSpec(SinkFunction<M> sinkFn, String opId) {
     super(OpCode.SINK, opId);
     this.sinkFn = sinkFn;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index aace2e2..b1e29c6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -39,7 +39,7 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
    * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
    * @param opId  the unique ID for this {@link StreamOperatorSpec}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, int opId) {
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, String opId) {
     super(opCode, opId);
     this.transformFn = transformFn;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 3c8879f..06a4f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -58,7 +58,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
    * @param window  the window function
    * @param opId  auto-generated unique ID of this operator
    */
-  WindowOperatorSpec(WindowInternal<M, WK, WV> window, int opId) {
+  WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) {
     super(OpCode.WINDOW, opId);
     this.window = window;
   }
@@ -124,7 +124,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
 
   @Override
   public Collection<StoreDescriptor> getStoreDescriptors() {
-    String storeName = getOpName();
+    String storeName = getOpId();
     String storeFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
 
     Serde storeKeySerde = new TimeSeriesKeySerde<>(window.getKeySerde());

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index c75608f..f9e0a3a 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -54,7 +54,7 @@ public class KeyValueStoreExample implements StreamApplication {
 
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
-            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
         .map(KV::getValue)
         .flatMap(new MyStatsCounter())
         .map(stats -> KV.of(stats.memberId, stats))

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index df393b0..ff785d9 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -51,7 +51,7 @@ public class OrderShipmentJoinExample implements StreamApplication {
     orders
         .join(shipments, new MyJoinFunction(),
             new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
-            Duration.ofMinutes(1))
+            Duration.ofMinutes(1), "join")
         .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
         .sendTo(fulfilledOrders);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index c40de7b..846b9f8 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -56,7 +56,7 @@ public class PageViewCounterExample implements StreamApplication {
     pageViewEvents
         .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
             .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
-            .setAccumulationMode(AccumulationMode.DISCARDING))
+            .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
         .sendTo(pageViewEventPerMemberStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index c403406..c9bcc45 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -49,8 +49,9 @@ public class RepartitionExample implements StreamApplication {
 
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
-            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
-        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null))
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
+        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null),
+            "window")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
         .sendTo(pageViewEventPerMember);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
index 9381e49..3c37c31 100644
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -54,8 +54,9 @@ public class WindowExample implements StreamApplication {
     // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
     // for 1 minute.
     inputStream
-        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, null)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde())
+            .setLateTrigger(Triggers.any(Triggers.count(30000),
+                Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
         .map(WindowPane::getMessage)
         .sendTo(outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index ab20bba..0a3e9c8 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -111,7 +111,7 @@ public class TestExecutionPlanner {
     MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1");
     OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
     input1
-        .partitionBy(m -> m.key, m -> m.value)
+        .partitionBy(m -> m.key, m -> m.value, "p1")
         .map(kv -> kv)
         .sendTo(output1);
     return streamGraph;
@@ -136,23 +136,23 @@ public class TestExecutionPlanner {
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
-            .partitionBy(m -> m.key, m -> m.value)
+            .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
-            .partitionBy(m -> m.key, m -> m.value)
+            .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
     OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
         .sendTo(output1);
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
         .sendTo(output2);
 
     return streamGraph;
@@ -166,35 +166,37 @@ public class TestExecutionPlanner {
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
-            .partitionBy(m -> m.key, m -> m.value)
+            .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
-            .partitionBy(m -> m.key, m -> m.value)
+            .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
     OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
     messageStream1.map(m -> m)
-        .filter(m -> true)
-        .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)));
+        .filter(m->true)
+        .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8),
+            mock(Serde.class), mock(Serde.class)), "w1");
 
     messageStream2.map(m -> m)
-        .filter(m -> true)
-        .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)));
+        .filter(m->true)
+        .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16),
+            mock(Serde.class), mock(Serde.class)), "w2");
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
         .sendTo(output1);
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
         .sendTo(output2);
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
         .sendTo(output2);
 
     return streamGraph;
@@ -242,12 +244,12 @@ public class TestExecutionPlanner {
     when(runner.getStreamSpec("output2")).thenReturn(output2);
 
     // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-1"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-3"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-8"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-p1"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-p2"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-p3"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-p3", "test-app-1-partition_by-p3", "default-system"));
   }
 
   @Test
@@ -429,7 +431,7 @@ public class TestExecutionPlanner {
 
     MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4");
     OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
-    input1.partitionBy(m -> m.key, m -> m.value).map(kv -> kv).sendTo(output1);
+    input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1);
     JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
 
     // the partitions should be the same as input1

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 10c4aa3..ba5c922 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -85,10 +85,10 @@ public class TestJobGraphJsonGenerator {
     when(runner.getStreamSpec("output2")).thenReturn(output2);
 
     // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-3"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-8"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-p1"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-p2"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"));
 
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
@@ -113,24 +113,24 @@ public class TestJobGraphJsonGenerator {
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
-            .partitionBy(m -> m.key, m -> m.value)
+            .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
-            .partitionBy(m -> m.key, m -> m.value)
+            .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
     OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
         .sendTo(outputStream1);
     messageStream2.sink((message, collector, coordinator) -> { });
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
         .sendTo(outputStream2);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index f6ebaf9..53e8bf6 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -20,6 +20,7 @@
 package org.apache.samza.execution;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.operators.KV;
@@ -45,8 +46,11 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestJobNode {
 
@@ -56,25 +60,31 @@ public class TestJobNode {
     StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
     StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
     StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
-    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", "partition_by-2", "intermediate-system");
+    StreamSpec partitionBySpec =
+        new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system");
     doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
     doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
     doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
-    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2");
+    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("jobName-jobId-partition_by-p1");
 
-    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
     streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
     MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1");
     MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2");
     OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
     JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
     input1
-        .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value)
+        .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value)
         .join(input2.map(kv -> kv.value), mockJoinFn,
-            new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1))
+            new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
+            Duration.ofHours(1), "j1")
         .sendTo(output);
 
-    JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
+    JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mockConfig);
     Config config = new MapConfig();
     StreamEdge input1Edge = new StreamEdge(input1Spec, config);
     StreamEdge input2Edge = new StreamEdge(input2Spec, config);
@@ -133,8 +143,8 @@ public class TestJobNode {
     assertTrue("Serialized output msg serde should be a StringSerde",
         outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
 
-    String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-2.samza.key.serde");
-    String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde");
+    String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
+    String partitionByMsgSerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
     assertTrue("Serialized serdes should contain intermediate stream key serde",
         deserializedSerdes.containsKey(partitionByKeySerde));
     assertTrue("Serialized intermediate stream key serde should be a StringSerde",
@@ -145,8 +155,8 @@ public class TestJobNode {
         "Serialized intermediate stream msg serde should be a StringSerde",
         partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
 
-    String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde");
-    String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde");
+    String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde");
+    String leftJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde");
     assertTrue("Serialized serdes should contain left join store key serde",
         deserializedSerdes.containsKey(leftJoinStoreKeySerde));
     assertTrue("Serialized left join store key serde should be a StringSerde",
@@ -156,8 +166,8 @@ public class TestJobNode {
     assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde",
         leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
 
-    String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde");
-    String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde");
+    String rightJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde");
+    String rightJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde");
     assertTrue("Serialized serdes should contain right join store key serde",
         deserializedSerdes.containsKey(rightJoinStoreKeySerde));
     assertTrue("Serialized right join store key serde should be a StringSerde",

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 09fb56a..dac4e94 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -20,8 +20,10 @@ package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -51,6 +53,7 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -77,6 +80,22 @@ public class TestJoinOperator {
     assertEquals(110, outputSum);
   }
 
+  @Test(expected = SamzaException.class)
+  public void joinWithSelfThrowsException() throws Exception {
+    StreamApplication app = new StreamApplication() {
+      @Override
+      public void init(StreamGraph graph, Config config) {
+        IntegerSerde integerSerde = new IntegerSerde();
+        KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+        MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde);
+
+        inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join");
+      }
+    };
+
+    createStreamOperatorTask(new SystemClock(), app); // should throw an exception
+  }
+
   @Test
   public void joinFnInitAndClose() throws Exception {
     TestJoinFunction joinFn = new TestJoinFunction();
@@ -277,10 +296,14 @@ public class TestJoinOperator {
     // need to return different stores for left and right side
     IntegerSerde integerSerde = new IntegerSerde();
     TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
-    when(taskContext.getStore(eq("join-2-L"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
-    when(taskContext.getStore(eq("join-2-R"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
+    when(taskContext.getStore(eq("jobName-jobId-join-j1-L")))
+        .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
+    when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
+        .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
 
     Config config = mock(Config.class);
+    when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
 
     StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock);
     sot.init(config, taskContext);
@@ -304,10 +327,8 @@ public class TestJoinOperator {
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
-          .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL)
-          .sink((message, messageCollector, taskCoordinator) -> {
-              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
-            });
+          .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1")
+          .sink((m, mc, tc) -> mc.send(new OutgoingMessageEnvelope(outputSystemStream, m)));
     }
   }