You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/27 20:30:59 UTC

[1/2] samza git commit: Enhancements to execution planner

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 d39bce9cb -> dde754246


Enhancements to execution planner


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dde75424
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dde75424
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dde75424

Branch: refs/heads/samza-fluent-api-v1
Commit: dde754246391729e00a19a15058525cdeb2fca1a
Parents: 93c82f3
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon Feb 27 12:25:23 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Feb 27 12:29:45 2017 -0800

----------------------------------------------------------------------
 .../samza/processorgraph/ExecutionPlanner.java  | 34 ++++++++++++--------
 1 file changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dde75424/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index a990463..055f87c 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -59,7 +59,7 @@ public class ExecutionPlanner {
     ProcessorGraph processorGraph = splitStages(streamGraph);
 
     // figure out the partition for internal streams
-    Multimap<String, StreamSpec> streams = calculatePartitions(processorGraph, sysAdmins);
+    Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
 
     // create the streams
     createStreams(streams, sysAdmins);
@@ -96,9 +96,12 @@ public class ExecutionPlanner {
     return processorGraph;
   }
 
-  private Multimap<String, StreamSpec> calculatePartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+  private Multimap<String, StreamSpec> calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
     // fetch the external streams partition info
-    getExternalStreamPartitions(processorGraph, sysAdmins);
+    getExistingStreamPartitions(processorGraph, sysAdmins);
+
+    // use BFS to figure out the join partition count
+
 
     // TODO this algorithm assumes only one processor, and it does not consider join
     Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
@@ -112,9 +115,11 @@ public class ExecutionPlanner {
           int partition = Math.max(maxInPartition, maxOutPartition);
 
           outStreams.forEach(streamEdge -> {
-              streamEdge.setPartitions(partition);
-              StreamSpec streamSpec = createStreamSpec(streamEdge);
-              streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
+              if (streamEdge.getPartitions() == -1) {
+                streamEdge.setPartitions(partition);
+                StreamSpec streamSpec = createStreamSpec(streamEdge);
+                streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
+              }
             });
         }
       });
@@ -122,16 +127,17 @@ public class ExecutionPlanner {
     return streamsGroupedBySystem;
   }
 
-  private void getExternalStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
-    Set<StreamEdge> externalStreams = new HashSet<>();
-    externalStreams.addAll(processorGraph.getSources());
-    externalStreams.addAll(processorGraph.getSinks());
+  private void getExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+    Set<StreamEdge> allStreams = new HashSet<>();
+    allStreams.addAll(processorGraph.getSources());
+    allStreams.addAll(processorGraph.getSinks());
+    allStreams.addAll(processorGraph.getInternalStreams());
 
     Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
-    externalStreams.forEach(streamEdge -> {
-        SystemStream systemStream = streamEdge.getSystemStream();
-        externalStreamsMap.put(systemStream.getSystem(), streamEdge);
-      });
+    allStreams.forEach(streamEdge -> {
+      SystemStream systemStream = streamEdge.getSystemStream();
+      externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+    });
     for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) {
       String systemName = entry.getKey();
       Collection<StreamEdge> streamEdges = entry.getValue();


[2/2] samza git commit: SAMZA-1092: replace stream spec in fluent API

Posted by xi...@apache.org.
SAMZA-1092: replace stream spec in fluent API

Replaced the StreamSpec class w/ the new one from SAMZA-1075.

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>

Reviewers: Jacob Maes <jm...@linkedin.com>

Closes #58 from nickpan47/replace-stream-spec and squashes the following commits:

761ebb5 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in system package
df953c2 [Yi Pan (Data Infrastructure)] SAMZA-1092: fix unit test
71331d8 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class
2fb19e9 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in fluent API
ed3ad8e [Yi Pan (Data Infrastructure)] WIP: replace stream spec in fluent API


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93c82f3d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93c82f3d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93c82f3d

Branch: refs/heads/samza-fluent-api-v1
Commit: 93c82f3de4f86d18b363ffb84bdaa407d4f2cac5
Parents: d39bce9
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Thu Feb 23 12:48:56 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Feb 27 12:29:45 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/operators/StreamGraph.java |  2 +-
 .../org/apache/samza/operators/StreamSpec.java  | 46 -----------
 .../apache/samza/operators/StreamGraphImpl.java | 83 +++++++++-----------
 .../samza/example/KeyValueStoreExample.java     | 24 +-----
 .../samza/example/NoContextStreamExample.java   | 33 +-------
 .../samza/example/OrderShipmentJoinExample.java | 35 +--------
 .../samza/example/PageViewCounterExample.java   | 23 +-----
 .../samza/example/RepartitionExample.java       | 23 +-----
 .../samza/example/TestBroadcastExample.java     | 15 +---
 .../apache/samza/example/TestJoinExample.java   | 26 ++----
 .../apache/samza/example/TestWindowExample.java | 15 +---
 .../operators/impl/TestStreamOperatorImpl.java  |  1 +
 12 files changed, 69 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index abc9861..30c4576 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -21,10 +21,10 @@ package org.apache.samza.operators;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
 
 import java.util.Map;
 
-
 /**
  * Job-level programming interface to create an operator DAG and run in various different runtime environments.
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
deleted file mode 100644
index c8a5e8d..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStream;
-
-import java.util.Properties;
-
-
-/**
- * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin}
- * to create a {@link SystemStream}
- */
-@InterfaceStability.Unstable
-public interface StreamSpec {
-  /**
-   * Get the {@link SystemStream}
-   *
-   * @return  {@link SystemStream} object
-   */
-  SystemStream getSystemStream();
-
-  /**
-   * Get the physical properties of the {@link SystemStream}
-   *
-   * @return  the properties of this stream
-   */
-  Properties getProperties();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index dca3469..353f455 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.operators;
 
-import java.util.Properties;
 import java.util.function.Function;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
@@ -44,6 +44,9 @@ public class StreamGraphImpl implements StreamGraph {
    */
   private int opId = 0;
 
+  // TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed
+  // after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment}
+  // s.t. we can allow different physical instantiation of stream under different execution environment w/o code change.
   private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
     final StreamSpec spec;
     final Serde<K> keySerde;
@@ -83,7 +86,7 @@ public class StreamGraphImpl implements StreamGraph {
         // TODO: need to find a way to directly pass in the serde class names
         // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
         //    message.getKey(), message.getKey(), message.getMessage()));
-        mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+        mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
       };
     }
   }
@@ -112,10 +115,10 @@ public class StreamGraphImpl implements StreamGraph {
         // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
         //    message.getKey(), message.getKey(), message.getMessage()));
         if (this.parKeyFn == null) {
-          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
         } else {
           // apply partition key function
-          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
+          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
         }
       };
     }
@@ -124,17 +127,17 @@ public class StreamGraphImpl implements StreamGraph {
   /**
    * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
    */
-  private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
-  private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
+  private final Map<String, MessageStream> inStreams = new HashMap<>();
+  private final Map<String, OutputStream> outStreams = new HashMap<>();
 
   private ContextManager contextManager = new ContextManager() { };
 
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
-      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    if (!this.inStreams.containsKey(streamSpec.getId())) {
+      this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
     }
-    return this.inStreams.get(streamSpec.getSystemStream());
+    return this.inStreams.get(streamSpec.getId());
   }
 
   /**
@@ -146,10 +149,10 @@ public class StreamGraphImpl implements StreamGraph {
    */
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
-      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    if (!this.outStreams.containsKey(streamSpec.getId())) {
+      this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
     }
-    return this.outStreams.get(streamSpec.getSystemStream());
+    return this.outStreams.get(streamSpec.getId());
   }
 
   /**
@@ -161,12 +164,12 @@ public class StreamGraphImpl implements StreamGraph {
    */
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
-      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
+    if (!this.inStreams.containsKey(streamSpec.getId())) {
+      this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
     }
-    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
-    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
-      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId());
+    if (!this.outStreams.containsKey(streamSpec.getId())) {
+      this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
     }
     return intStream;
   }
@@ -200,12 +203,15 @@ public class StreamGraphImpl implements StreamGraph {
   /**
    * Helper method to be get the input stream via {@link SystemStream}
    *
-   * @param systemStream  the {@link SystemStream}
+   * @param sstream  the {@link SystemStream}
    * @return  a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
    */
-  public MessageStreamImpl getInputStream(SystemStream systemStream) {
-    if (this.inStreams.containsKey(systemStream)) {
-      return (MessageStreamImpl) this.inStreams.get(systemStream);
+  public MessageStreamImpl getInputStream(SystemStream sstream) {
+    for(MessageStream entry: this.inStreams.values()) {
+      if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
+          ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+        return (MessageStreamImpl) entry;
+      }
     }
     return null;
   }
@@ -217,13 +223,6 @@ public class StreamGraphImpl implements StreamGraph {
     return null;
   }
 
-  <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
-    if (this.inStreams.containsValue(outStream)) {
-      return (MessageStream<M>) outStream;
-    }
-    return null;
-  }
-
   /**
    * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
    *
@@ -234,27 +233,21 @@ public class StreamGraphImpl implements StreamGraph {
    */
   <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
     // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
-    StreamSpec streamSpec = new StreamSpec() {
-      @Override
-      public SystemStream getSystemStream() {
-        // TODO: should auto-generate intermedaite stream name here
-        return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
-      }
+    StreamSpec streamSpec = this.createIntStreamSpec();
 
-      @Override
-      public Properties getProperties() {
-        return null;
-      }
-    };
-
-    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
-      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
+    if (!this.inStreams.containsKey(streamSpec.getId())) {
+      this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
     }
-    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
-    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
-      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
+    if (!this.outStreams.containsKey(streamSpec.getId())) {
+      this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
     }
     return intStream;
   }
 
+  private StreamSpec createIntStreamSpec() {
+    // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 85ebc6c..4a0681e 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -32,12 +32,10 @@ import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
 
-import java.util.Properties;
-
 
 /**
  * Example code using {@link KeyValueStore} to implement event-time window
@@ -113,25 +111,9 @@ public class KeyValueStoreExample implements StreamGraphBuilder {
     }
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
 
   class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
     String pageId;

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
index c6d2e6e..320680c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -28,13 +28,12 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.CommandLine;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 
 
 /**
@@ -42,35 +41,11 @@ import java.util.Properties;
  */
 public class NoContextStreamExample implements StreamGraphBuilder {
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "input1");
-    }
+  StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");
 
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");
 
-  StreamSpec input2 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "input2");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "output");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");
 
   class MessageType {
     String joinKey;

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 0477066..30ce7d2 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -23,17 +23,14 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
-import java.util.Properties;
-
 
 /**
  * Simple 2-way stream-to-stream join example
@@ -71,35 +68,11 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder {
     standaloneEnv.run(new OrderShipmentJoinExample(), config);
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "Orders");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec input2 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "Shipment");
-    }
+  StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
 
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "FulfilledOrders");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");
 
   class OrderRecord implements MessageEnvelope<String, OrderRecord> {
     String orderId;

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index f7d8bda..fcf67a7 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -29,11 +29,10 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
-import java.util.Properties;
 
 
 /**
@@ -62,25 +61,9 @@ public class PageViewCounterExample implements StreamGraphBuilder {
     standaloneEnv.run(new PageViewCounterExample(), config);
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
 
   class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
     String pageId;

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 6994ac4..228668c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -27,11 +27,10 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
-import java.util.*;
 
 
 /**
@@ -73,25 +72,9 @@ public class RepartitionExample implements StreamGraphBuilder {
     standaloneEnv.run(new RepartitionExample(), config);
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
 
   class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
     String pageId;

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index d22324b..059afce 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -22,18 +22,16 @@ package org.apache.samza.example;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.time.Duration;
 import java.util.function.BiFunction;
-import java.util.Properties;
 import java.util.Set;
 
 
@@ -70,15 +68,8 @@ public class TestBroadcastExample extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
     BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
     inputs.keySet().forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-          @Override public SystemStream getSystemStream() {
-            return entry;
-          }
-
-          @Override public Properties getProperties() {
-            return null;
-          }
-        }, null, null).map(this::getInputMessage);
+        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
+                new StreamSpec(entry.toString(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
 
         inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
index fe6e7e7..cc53814 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -22,19 +22,18 @@ package org.apache.samza.example;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 
 
@@ -65,16 +64,9 @@ public class TestJoinExample  extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
 
     for (SystemStream input : inputs.keySet()) {
+      StreamSpec inputStreamSpec = new StreamSpec(input.toString(), input.getStream(), input.getSystem());
       MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
-          new StreamSpec() {
-            @Override public SystemStream getSystemStream() {
-              return input;
-            }
-
-            @Override public Properties getProperties() {
-              return null;
-            }
-          }, null, null).map(this::getInputMessage);
+          inputStreamSpec, null, null).map(this::getInputMessage);
       if (joinOutput == null) {
         joinOutput = newSource;
       } else {
@@ -82,15 +74,9 @@ public class TestJoinExample  extends TestExampleBase {
       }
     }
 
-    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return null;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, new StringSerde("UTF-8"), new JsonSerde<>()));
+    joinOutput.sendTo(graph.createOutStream(
+            new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"),
+            new StringSerde("UTF-8"), new JsonSerde<>()));
 
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index e08ca20..73f4674 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -21,18 +21,16 @@ package org.apache.samza.example;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.time.Duration;
 import java.util.function.BiFunction;
-import java.util.Properties;
 import java.util.Set;
 
 
@@ -60,15 +58,8 @@ public class TestWindowExample extends TestExampleBase {
   @Override
   public void init(StreamGraph graph, Config config) {
     BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
-    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return source;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, null, null).
+    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
+            new StreamSpec(source.toString(), source.getStream(), source.getSystem()), null, null).
         map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
             m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 010a210..0a873fd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.*;
 public class TestStreamOperatorImpl {
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testSimpleOperator() {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);