You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:53 UTC

[10/10] samza git commit: SAMZA-1659: Serializable OperatorSpec

SAMZA-1659: Serializable OperatorSpec

This change is to make the user supplied functions serializable. Hence, making the full user defined DAG serializable.

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Jagadish <jv...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>

Closes #475 from nickpan47/serializable-opspec-only-Jan-24-18 and squashes the following commits:

db0dea73 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix intermittent TestZkLocalApplicationRunner failure due to StreamProcessor#stop()
34716d42 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix a comment on OperatorSpec#isClone()
37d4e6ae [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing latest round of review comments
68674a14 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
d3a7826c [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing review comments
f83e8dd0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
acca418b [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
842a73d6 [Yi Pan (Data Infrastructure)] SAMZA-1659: making user-defined functions in high-level API serializable
ad85a2cb [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
c1567116 [Yi Pan (Data Infrastructure)] SAMZA-1659: Before re-merge with master. Still need to fix unit tests (moving OperatorSpec clone tests to OperatorSpecGraph.clone)
f2563f8e [Yi Pan (Data Infrastructure)] SAMZA-1659: serialize the whole DAG instead of each individual OperatorSpec.
24d33496 [Yi Pan (Data Infrastructure)] SAMZA-1659: updated according to review comments. Need to merge again with master.
3f643f8b [Yi Pan (Data Infrastructure)] SAMZA-1659: serialiable OperatorSpec
ed7d8c0e [Yi Pan (Data Infrastructure)] Fixed some javadoc and test files
94de218b [Yi Pan (Data Infrastructure)] Remove public access from StreamGraphImpl#getIntermediateStream(String, Serde)
8f4e9dd4 [Yi Pan (Data Infrastructure)] Serialization of StreamGraph in a wrapper class SerializedStreamGraph
f3bb1958 [Yi Pan (Data Infrastructure)] Fix some comments
c15246f5 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
e981967d [Yi Pan (Data Infrastructure)] WIP: fixing unit test for SamzaSQL translators w/ serialization of operator functions
40583051 [Yi Pan (Data Infrastructure)] WIP: update the serialization of user functions after the merge
18ba924f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
93951c5f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
54a28801 [Yi Pan (Data Infrastructure)] WIP: broadcast, sendtotable, and streamtotablejoin serialization and unit tests
45eb1fb0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
7c8d1591 [Yi Pan (Data Infrastructure)] WIP: working on unit tests for trigger, broadcast, join, table, and SQL UDF function serialization
b973b105 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
aca42308 [Yi Pan (Data Infrastructure)] WIP: Serialize OperatorSpec only w/o StreamApplication interface change. Passed all build and tests.
0ebebfc3 [Yi Pan (Data Infrastructure)] WIP: serialization only change
1670aff0 [Yi Pan (Data Infrastructure)] WIP: class-loading of user program logic and main() method based user program logic are both included in ThreadJobFactory/ProcessJobFactory/YarnJobFactory. ThreadJobFactory test suite to be fixed.
4102aa8c [Yi Pan (Data Infrastructure)] WIP: continued working on potential offspring integration
dc7da87e [Yi Pan (Data Infrastructure)] WIP: unit tests for serialization
475a46bc [Yi Pan (Data Infrastructure)] WIP: fixed TestZkLocalApplicationRunner. Debugging issues w/ TestRepartitionWindowApp (i.e. missing changelog creation step when directly running LocalApplicationRunner)
6a14b2af [Yi Pan (Data Infrastructure)] WIP: fixed unit test failure for Windows
d4640329 [Yi Pan (Data Infrastructure)] WIP: fixing unit tests after merge
bf1ce907 [Yi Pan (Data Infrastructure)] WIP: removing StreamDescriptor first
50201728 [Yi Pan (Data Infrastructure)] Merge branch 'experiment-new-api-v2' into new-api-v2-0.14
dde1ab14 [Yi Pan (Data Infrastructure)] WIP: first end-to-end test
d7df6ed0 [Yi Pan (Data Infrastructure)] WIP: added all unit test for OperatorSpec#copy methods.
6fc6d4c0 [Yi Pan (Data Infrastructure)] WIP: experiment code to implement an end-to-end working example for new APIs
525d8bc1 [Yi Pan (Data Infrastructure)] Merge branch '0.14.0' into new-api-v2
e6fb96e5 [Yi Pan (Data Infrastructure)] WIP: merged all application types into StreamApplications
f227380f [Yi Pan (Data Infrastructure)] WIP: update the app runner classes
256155ad [Yi Pan (Data Infrastructure)] WIP: new API user code examples
4a6a58dc [Yi Pan (Data Infrastructure)] WIP: updated w/ low-level task API and global var ingestion/metrics reporter
3c50629e [Yi Pan (Data Infrastructure)] WIP: adding support for low-level task APIs
51541e13 [Yi Pan (Data Infrastructure)] WIP: cleanup StreamDescriptor
0bc7ee7b [Yi Pan (Data Infrastructure)] WIP: update the user code example on new APIs
cd528c1c [Yi Pan (Data Infrastructure)] WIP: updated spec and user DAG API
b898e6c0 [Yi Pan (Data Infrastructure)] WIP: new-api-v2
91f364f1 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs
ae3dc6ff [Yi Pan (Data Infrastructure)] WIP: new api revision
8bb97520 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs
5573a069 [Yi Pan (Data Infrastructure)] WIP: new api revision
aeb45730 [Xinyu Liu] SAMZA-1321: Propagate end-of-stream and watermark messages


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/53d7f262
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/53d7f262
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/53d7f262

Branch: refs/heads/master
Commit: 53d7f2625145f560eb6ccc49d48dc176f244f9b3
Parents: bc4a0c2
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Fri May 25 09:37:55 2018 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Committed: Fri May 25 09:37:55 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |   5 +-
 .../samza/application/StreamApplication.java    |   7 +-
 .../java/org/apache/samza/config/MapConfig.java |   7 +-
 .../apache/samza/operators/MessageStream.java   |  19 +-
 .../operators/functions/ClosableFunction.java   |   3 +
 .../operators/functions/FilterFunction.java     |   3 +-
 .../operators/functions/FlatMapFunction.java    |   3 +-
 .../operators/functions/FoldLeftFunction.java   |  16 +-
 .../samza/operators/functions/JoinFunction.java |   3 +-
 .../samza/operators/functions/MapFunction.java  |   3 +-
 .../samza/operators/functions/SinkFunction.java |   3 +-
 .../functions/StreamTableJoinFunction.java      |   3 +-
 .../operators/functions/SupplierFunction.java   |  38 ++
 .../samza/operators/triggers/AnyTrigger.java    |  10 +-
 .../samza/operators/triggers/Trigger.java       |   3 +-
 .../apache/samza/operators/windows/Window.java  |   3 +-
 .../apache/samza/operators/windows/Windows.java |  53 +-
 .../windows/internal/WindowInternal.java        |  32 +-
 .../samza/serializers/SerializableSerde.java    |   2 +-
 .../org/apache/samza/system/StreamSpec.java     |   3 +-
 .../samza/system/SystemStreamPartition.java     |   4 +-
 .../java/org/apache/samza/table/TableSpec.java  |  12 +-
 .../samza/operators/windows/TestWindowPane.java |   2 +-
 .../samza/execution/ExecutionPlanner.java       |  28 +-
 .../org/apache/samza/execution/JobGraph.java    |  14 +-
 .../samza/execution/JobGraphJsonGenerator.java  |   4 +-
 .../org/apache/samza/execution/JobNode.java     |  26 +-
 .../samza/operators/MessageStreamImpl.java      |  79 ++-
 .../samza/operators/OperatorSpecGraph.java      | 132 ++++
 .../apache/samza/operators/StreamGraphImpl.java | 328 ----------
 .../apache/samza/operators/StreamGraphSpec.java | 299 +++++++++
 .../org/apache/samza/operators/TableImpl.java   |   3 +-
 .../operators/impl/BroadcastOperatorImpl.java   |   2 +-
 .../samza/operators/impl/OperatorImpl.java      |   2 +-
 .../samza/operators/impl/OperatorImplGraph.java |  73 ++-
 .../operators/impl/OutputOperatorImpl.java      |   5 +-
 .../operators/impl/PartitionByOperatorImpl.java |  16 +-
 .../operators/impl/StreamOperatorImpl.java      |   3 +-
 .../operators/impl/WindowOperatorImpl.java      |  23 +-
 .../operators/spec/FilterOperatorSpec.java      |  74 +++
 .../operators/spec/FlatMapOperatorSpec.java     |  47 ++
 .../samza/operators/spec/InputOperatorSpec.java |  13 +-
 .../samza/operators/spec/JoinOperatorSpec.java  |  17 +-
 .../samza/operators/spec/MapOperatorSpec.java   |  77 +++
 .../samza/operators/spec/MergeOperatorSpec.java |  51 ++
 .../samza/operators/spec/OperatorSpec.java      |  23 +-
 .../samza/operators/spec/OperatorSpecs.java     |  73 +--
 .../samza/operators/spec/OutputStreamImpl.java  |  17 +-
 .../operators/spec/PartitionByOperatorSpec.java |  23 +-
 .../operators/spec/SendToTableOperatorSpec.java |   9 +-
 .../operators/spec/StreamOperatorSpec.java      |  23 +-
 .../operators/spec/WindowOperatorSpec.java      |  11 +
 .../stream/IntermediateMessageStreamImpl.java   |   4 +-
 .../samza/operators/triggers/Cancellable.java   |   2 +-
 .../samza/operators/triggers/TriggerImpl.java   |   6 +-
 .../runtime/AbstractApplicationRunner.java      |  21 +-
 .../samza/runtime/LocalApplicationRunner.java   |  28 +-
 .../samza/runtime/LocalContainerRunner.java     |  11 +-
 .../apache/samza/task/StreamOperatorTask.java   |  48 +-
 .../org/apache/samza/task/TaskFactoryUtil.java  |  26 +-
 .../apache/samza/container/TaskInstance.scala   |   1 -
 .../samza/job/local/ThreadJobFactory.scala      |  11 +-
 .../apache/samza/example/BroadcastExample.java  |  71 ---
 .../samza/example/KeyValueStoreExample.java     | 131 ----
 .../org/apache/samza/example/MergeExample.java  |  60 --
 .../samza/example/OrderShipmentJoinExample.java | 115 ----
 .../samza/example/PageViewCounterExample.java   |  95 ---
 .../samza/example/RepartitionExample.java       |  90 ---
 .../org/apache/samza/example/WindowExample.java |  81 ---
 .../samza/execution/TestExecutionPlanner.java   | 117 ++--
 .../apache/samza/execution/TestJobGraph.java    |  68 +--
 .../execution/TestJobGraphJsonGenerator.java    |  32 +-
 .../org/apache/samza/execution/TestJobNode.java |  14 +-
 .../samza/operators/TestJoinOperator.java       | 152 ++---
 .../samza/operators/TestMessageStreamImpl.java  |  55 +-
 .../samza/operators/TestOperatorSpecGraph.java  | 185 ++++++
 .../samza/operators/TestStreamGraphImpl.java    | 601 -------------------
 .../samza/operators/TestStreamGraphSpec.java    | 601 +++++++++++++++++++
 .../data/TestOutputMessageEnvelope.java         |  14 +
 .../operators/impl/TestOperatorImplGraph.java   | 298 ++++++---
 .../operators/impl/TestStreamOperatorImpl.java  |   4 +-
 .../operators/impl/TestWindowOperator.java      | 263 ++++----
 .../operators/spec/OperatorSpecTestUtils.java   | 141 +++++
 .../samza/operators/spec/TestOperatorSpec.java  | 465 ++++++++++++++
 .../spec/TestPartitionByOperatorSpec.java       | 165 +++++
 .../operators/spec/TestWindowOperatorSpec.java  | 306 +++++++++-
 .../runtime/TestAbstractApplicationRunner.java  |  36 +-
 .../runtime/TestLocalApplicationRunner.java     |  21 +-
 .../apache/samza/task/IdentityStreamTask.java   |  55 ++
 .../apache/samza/task/TestTaskFactoryUtil.java  |  64 +-
 .../testUtils/InvalidStreamApplication.java     |  25 -
 .../samza/system/kafka/TestKafkaStreamSpec.java |   3 +-
 .../samza/sql/data/SamzaSqlCompositeKey.java    |   1 +
 .../sql/data/SamzaSqlExecutionContext.java      |  20 +-
 .../samza/sql/translator/FilterTranslator.java  |  47 +-
 .../translator/LogicalAggregateTranslator.java  |  24 +-
 .../samza/sql/translator/ProjectTranslator.java |  60 +-
 .../samza/sql/translator/QueryTranslator.java   |  46 +-
 .../SamzaSqlRelMessageJoinFunction.java         |  12 +-
 .../samza/sql/translator/ScanTranslator.java    |  28 +-
 .../samza/sql/translator/TranslatorContext.java |  79 ++-
 .../apache/samza/sql/TestQueryTranslator.java   | 510 ----------------
 .../sql/TestSamzaSqlApplicationConfig.java      |  95 ---
 .../sql/TestSamzaSqlApplicationRunner.java      |  56 --
 .../samza/sql/TestSamzaSqlFileParser.java       |  58 --
 .../samza/sql/TestSamzaSqlQueryParser.java      |  76 ---
 .../samza/sql/TestSamzaSqlRelMessage.java       |  46 --
 .../sql/TestSamzaSqlRelMessageJoinFunction.java | 119 ----
 .../samza/sql/data/TestSamzaSqlRelMessage.java  |  46 ++
 .../runner/TestSamzaSqlApplicationConfig.java   |  95 +++
 .../runner/TestSamzaSqlApplicationRunner.java   |  56 ++
 .../sql/testutil/TestSamzaSqlFileParser.java    |  58 ++
 .../sql/testutil/TestSamzaSqlQueryParser.java   |  75 +++
 .../sql/translator/TestFilterTranslator.java    | 136 +++++
 .../sql/translator/TestJoinTranslator.java      | 191 ++++++
 .../sql/translator/TestProjectTranslator.java   | 289 +++++++++
 .../sql/translator/TestQueryTranslator.java     | 596 ++++++++++++++++++
 .../TestSamzaSqlRelMessageJoinFunction.java     | 118 ++++
 .../sql/translator/TranslatorTestBase.java      |  72 +++
 .../example/AppWithGlobalConfigExample.java     |  86 +++
 .../apache/samza/example/BroadcastExample.java  |  70 +++
 .../samza/example/KeyValueStoreExample.java     | 138 +++++
 .../org/apache/samza/example/MergeExample.java  |  62 ++
 .../samza/example/OrderShipmentJoinExample.java | 121 ++++
 .../samza/example/PageViewCounterExample.java   | 100 +++
 .../samza/example/RepartitionExample.java       |  96 +++
 .../org/apache/samza/example/WindowExample.java |  86 +++
 .../samza/test/framework/StreamAssert.java      |  14 +
 .../EndOfStreamIntegrationTest.java             |   8 +-
 .../WatermarkIntegrationTest.java               |   7 +-
 .../test/operator/RepartitionJoinWindowApp.java |  54 +-
 .../test/operator/RepartitionWindowApp.java     |  72 +++
 .../samza/test/operator/SessionWindowApp.java   |  21 +-
 .../operator/TestRepartitionJoinWindowApp.java  |   2 +-
 .../test/operator/TestRepartitionWindowApp.java |  90 +++
 .../samza/test/operator/TumblingWindowApp.java  |  20 +-
 .../samza/test/operator/data/PageView.java      |   3 +-
 .../test/processor/SharedContextFactories.java  | 117 ++++
 .../test/processor/TestStreamApplication.java   | 148 +++++
 .../processor/TestZkLocalApplicationRunner.java | 211 +++----
 .../apache/samza/test/table/TestLocalTable.java | 243 +++++---
 .../samza/test/table/TestRemoteTable.java       |  37 +-
 .../apache/samza/test/timer/TestTimerApp.java   |   5 +-
 143 files changed, 7227 insertions(+), 3811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6872354..a94fcfa 100644
--- a/build.gradle
+++ b/build.gradle
@@ -325,6 +325,9 @@ project(':samza-sql') {
 
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
+    testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
+    testCompile "org.powermock:powermock-core:$powerMockVersion"
+    testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
 
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
@@ -756,10 +759,10 @@ project(":samza-test_$scalaVersion") {
     compile project(":samza-kv-inmemory_$scalaVersion")
     compile project(":samza-kv-rocksdb_$scalaVersion")
     compile project(":samza-core_$scalaVersion")
+    compile project(":samza-kafka_$scalaVersion")
     compile project(":samza-sql")
     runtime project(":samza-log4j")
     runtime project(":samza-yarn_$scalaVersion")
-    runtime project(":samza-kafka_$scalaVersion")
     runtime project(":samza-hdfs_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index f615207..0b2142b 100644
--- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -61,9 +61,10 @@ import org.apache.samza.task.TaskContext;
  *
  * <p>
  * Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution.
- * A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
- * {@link StreamTask} instance used for processing incoming messages. Execution is synchronous and thread-safe within
- * each {@link StreamTask}.
+ * A new StreamApplication instance will be created and initialized with a user-defined {@link StreamGraph}
+ * when planning the execution. The {@link StreamGraph} and the functions implemented for transforms are required to
+ * be serializable. The execution planner will generate a serialized DAG which will be deserialized in each {@link StreamTask}
+ * instance used for processing incoming messages. Execution is synchronous and thread-safe within each {@link StreamTask}.
  *
  * <p>
  * Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
index 0b1ed98..5af2535 100644
--- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
@@ -43,8 +43,11 @@ public class MapConfig extends Config {
 
   public MapConfig(List<Map<String, String>> maps) {
     this.map = new HashMap<>();
-    for (Map<String, String> m: maps)
-      this.map.putAll(m);
+    for (Map<String, String> m: maps) {
+      if (m != null) {
+        this.map.putAll(m);
+      }
+    }
   }
 
   public MapConfig(Map<String, String>... maps) {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 98f0784..7797f9a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.function.Function;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -237,34 +236,34 @@ public interface MessageStream<M> {
    * <p>
    * Unlike {@link #sendTo}, messages with a null key are all sent to partition 0.
    *
-   * @param keyExtractor the {@link Function} to extract the message and partition key from the input message.
+   * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message.
    *                     Messages with a null key are all sent to partition 0.
-   * @param valueExtractor the {@link Function} to extract the value from the input message
+   * @param valueExtractor the {@link MapFunction} to extract the value from the input message
    * @param serde the {@link KVSerde} to use for (de)serializing the key and value.
    * @param id the unique id of this operator in this application
    * @param <K> the type of output key
    * @param <V> the type of output value
    * @return the repartitioned {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
+  <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+      MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
 
   /**
-   * Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde.
+   * Same as calling {@link #partitionBy(MapFunction, MapFunction, KVSerde, String)} with a null KVSerde.
    * <p>
    * Uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde. If the default
    * serde is not a {@link KVSerde}, a runtime exception will be thrown. If no default serde has been provided
    * <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} is used.
    *
-   * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
-   * @param valueExtractor the {@link Function} to extract the value from the input message
+   * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message
+   * @param valueExtractor the {@link MapFunction} to extract the value from the input message
    * @param id the unique id of this operator in this application
    * @param <K> the type of output key
    * @param <V> the type of output value
    * @return the repartitioned {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor, String id);
+  <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+      MapFunction<? super M, ? extends V> valueExtractor, String id);
 
   /**
    * Sends messages in this {@link MessageStream} to a {@link Table}. The type of input message is expected

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
index ea83ba4..faf9fc5 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
@@ -33,5 +33,8 @@ import org.apache.samza.annotation.InterfaceStability;
  */
 @InterfaceStability.Unstable
 public interface ClosableFunction {
+  /**
+   * Frees any resource acquired by the operators in {@link InitableFunction}
+   */
   default void close() {}
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index 31bbbd8..ce68e0f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 
 
@@ -28,7 +29,7 @@ import org.apache.samza.annotation.InterfaceStability;
  */
 @InterfaceStability.Unstable
 @FunctionalInterface
-public interface FilterFunction<M> extends InitableFunction, ClosableFunction {
+public interface FilterFunction<M> extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Returns a boolean indicating whether this message should be retained or filtered out.

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index 7e9253e..63d7061 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 
 import java.util.Collection;
@@ -31,7 +32,7 @@ import java.util.Collection;
  */
 @InterfaceStability.Unstable
 @FunctionalInterface
-public interface FlatMapFunction<M, OM>  extends InitableFunction, ClosableFunction {
+public interface FlatMapFunction<M, OM>  extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Transforms the provided message into a collection of 0 or more messages.

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
index 78250e3..d6ba205 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
@@ -19,16 +19,22 @@
 
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+
+
 /**
- * Incrementally updates the window value as messages are added to the window.
+ * Incrementally updates the aggregated value as messages are added. Main usage is in {@link org.apache.samza.operators.windows.Window} operator.
  */
-public interface FoldLeftFunction<M, WV> extends InitableFunction, ClosableFunction {
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface FoldLeftFunction<M, WV> extends InitableFunction, ClosableFunction, Serializable {
 
   /**
-   * Incrementally updates the window value as messages are added to the window.
+   * Incrementally updates the aggregated value as messages are added.
    *
-   * @param message the message being added to the window
-   * @param oldValue the previous value associated with the window
+   * @param message the message being added to the aggregated value
+   * @param oldValue the previous value
    * @return the new value
    */
   WV apply(M message, WV oldValue);

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index 954083d..94a998d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 
 
@@ -30,7 +31,7 @@ import org.apache.samza.annotation.InterfaceStability;
  * @param <RM>  type of the joined message
  */
 @InterfaceStability.Unstable
-public interface JoinFunction<K, M, JM, RM>  extends InitableFunction, ClosableFunction {
+public interface JoinFunction<K, M, JM, RM>  extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Joins the provided messages and returns the joined message.

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index a8c139f..fad9cf8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 
 
@@ -29,7 +30,7 @@ import org.apache.samza.annotation.InterfaceStability;
  */
 @InterfaceStability.Unstable
 @FunctionalInterface
-public interface MapFunction<M, OM>  extends InitableFunction, ClosableFunction {
+public interface MapFunction<M, OM>  extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Transforms the provided message into another message.

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index e290d7d..2b44125 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
@@ -30,7 +31,7 @@ import org.apache.samza.task.TaskCoordinator;
  */
 @InterfaceStability.Unstable
 @FunctionalInterface
-public interface SinkFunction<M>  extends InitableFunction, ClosableFunction {
+public interface SinkFunction<M>  extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
index 6afcf67..356e07f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 
 
@@ -30,7 +31,7 @@ import org.apache.samza.annotation.InterfaceStability;
  * @param <JM> type of join results
  */
 @InterfaceStability.Unstable
-public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction {
+public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Joins the provided messages and table record, returns the joined message.

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java
new file mode 100644
index 0000000..155fb0e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * A supplier to return a new value at each invocation
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface SupplierFunction<T> extends InitableFunction, ClosableFunction, Serializable {
+
+  /**
+   * Returns a value of type T
+   *
+   * @return a value for type T
+   */
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index f52b57b..6bdf406 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -18,20 +18,24 @@
 */
 package org.apache.samza.operators.triggers;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
+
 /**
  * A {@link Trigger} fires as soon as any of its individual triggers has fired.
  */
 public class AnyTrigger<M> implements Trigger<M> {
 
-  private final List<Trigger<M>> triggers;
+  private final ArrayList<Trigger<M>> triggers;
 
   AnyTrigger(List<Trigger<M>> triggers) {
-    this.triggers = triggers;
+    this.triggers = new ArrayList<>();
+    this.triggers.addAll(triggers);
   }
 
   public List<Trigger<M>> getTriggers() {
-    return triggers;
+    return Collections.unmodifiableList(triggers);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
index be0a877..f224fa2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
@@ -20,6 +20,7 @@
 package org.apache.samza.operators.triggers;
 
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 
 /**
@@ -30,6 +31,6 @@ import org.apache.samza.annotation.InterfaceStability;
  * @param <M> the type of the incoming message
  */
 @InterfaceStability.Unstable
-public interface Trigger<M> {
+public interface Trigger<M> extends Serializable {
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 1c0fa53..7534fca 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.windows;
 
+import java.io.Serializable;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.triggers.Trigger;
 
@@ -70,7 +71,7 @@ import org.apache.samza.operators.triggers.Trigger;
  * @param <WV> the type of the value in the window
  */
 @InterfaceStability.Unstable
-public interface Window<M, K, WV> {
+public interface Window<M, K, WV> extends Serializable {
 
   /**
    * Set the early triggers for this {@link Window}.

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 50391ff..4805a0e 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -21,6 +21,8 @@ package org.apache.samza.operators.windows;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.triggers.TimeTrigger;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
@@ -30,8 +32,6 @@ import org.apache.samza.serializers.Serde;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.function.Function;
-import java.util.function.Supplier;
 
 /**
  * APIs for creating different types of {@link Window}s.
@@ -84,7 +84,7 @@ import java.util.function.Supplier;
  * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
  * of the window types above.
  *
- * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link Supplier}
+ * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link SupplierFunction}
  * and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is
  * created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
  * the emitted {@link WindowPane} will contain a collection of messages in the window.
@@ -105,8 +105,8 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
-   *    Function<UserClick, String> keyFn = ...;
-   *    Supplier<Integer> initialValue = () -> 0;
+   *    MapFunction<UserClick, String> keyFn = ...;
+   *    SupplierFunction<Integer> initialValue = () -> 0;
    *    FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
    *    MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
    *        Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
@@ -125,16 +125,15 @@ public final class Windows {
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function.
    */
-  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval,
-      Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde,
+  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(MapFunction<? super M, ? extends K> keyFn, Duration interval,
+      SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde,
       Serde<WV> windowValueSerde) {
 
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
-    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
-        (Function<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null);
+    return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+        (MapFunction<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null);
   }
 
-
   /**
    * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
    * processing time based windows using the provided keyFn.
@@ -157,12 +156,12 @@ public final class Windows {
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval,
+  public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(MapFunction<M, K> keyFn, Duration interval,
       Serde<K> keySerde, Serde<M> msgSerde) {
 
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
-    return new WindowInternal<>(defaultTrigger, null, null, keyFn, null,
-        WindowType.TUMBLING, keySerde, null, msgSerde);
+    return new WindowInternal<>(defaultTrigger, null, null, keyFn, null, WindowType.TUMBLING,
+        keySerde, null, msgSerde);
   }
 
   /**
@@ -173,7 +172,7 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<String> stream = ...;
-   *    Supplier<Integer> initialValue = () -> 0;
+   *    SupplierFunction<Integer> initialValue = () -> 0;
    *    FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
    *    MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
    *        Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
@@ -189,10 +188,10 @@ public final class Windows {
    * @param <WV> the type of the {@link WindowPane} output value
    * @return the created {@link Window} function
    */
-  public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue,
+  public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, SupplierFunction<? extends WV> initialValue,
       FoldLeftFunction<? super M, WV> aggregator, Serde<WV> windowValueSerde) {
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
-    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+    return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
         null, null, WindowType.TUMBLING, null, windowValueSerde, null);
   }
 
@@ -221,9 +220,8 @@ public final class Windows {
    */
   public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration, Serde<M> msgSerde) {
     Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
-
-    return new WindowInternal<>(defaultTrigger, null, null, null,
-       null, WindowType.TUMBLING, null, null, msgSerde);
+    return new WindowInternal<>(defaultTrigger, null, null, null, null,
+        WindowType.TUMBLING, null, null, msgSerde);
   }
 
   /**
@@ -238,7 +236,7 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
-   *    Supplier<Integer> initialValue = () -> 0;
+   *    SupplierFunction<Integer> initialValue = () -> 0;
    *    FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
    *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
    *    MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
@@ -258,12 +256,12 @@ public final class Windows {
    * @param <WV> the type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function
    */
-  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn,
-      Duration sessionGap, Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator,
+  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn,
+      Duration sessionGap, SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator,
       Serde<K> keySerde, Serde<WV> windowValueSerde) {
     Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
-    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
-        (Function<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null);
+    return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+        (MapFunction<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null);
   }
 
   /**
@@ -278,7 +276,7 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
-   *    Supplier<Integer> initialValue = () -> 0;
+   *    SupplierFunction<Integer> initialValue = () -> 0;
    *    FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
    *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
    *    MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
@@ -294,11 +292,10 @@ public final class Windows {
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn,
+  public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn,
       Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde) {
-
     Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
-    return new WindowInternal<>(defaultTrigger, null, null, (Function<M, K>) keyFn,
+    return new WindowInternal<>(defaultTrigger, null, null, (MapFunction<M, K>) keyFn,
         null, WindowType.SESSION, keySerde, null, msgSerde);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index bc71872..ff19aba 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -18,14 +18,14 @@
  */
 package org.apache.samza.operators.windows.internal;
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.serializers.Serde;
 
-import java.util.function.Function;
-import java.util.function.Supplier;
 
 /**
  *  Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window}
@@ -45,7 +45,7 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
   /**
    * The supplier of initial value to be used for windowed aggregations
    */
-  private final Supplier<WV> initializer;
+  private final SupplierFunction<WV> initializer;
 
   /*
    * The function that is applied each time a {@link MessageEnvelope} is added to this window.
@@ -55,28 +55,32 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
   /*
    * The function that extracts the key from a {@link MessageEnvelope}
    */
-  private final Function<M, WK> keyExtractor;
+  private final MapFunction<M, WK> keyExtractor;
 
   /*
    * The function that extracts the event time from a {@link MessageEnvelope}
    */
-  private final Function<M, Long> eventTimeExtractor;
+  private final MapFunction<M, Long> eventTimeExtractor;
 
   /**
    * The type of this window. Tumbling and Session windows are supported for now.
    */
   private final WindowType windowType;
 
-  private final Serde<WK> keySerde;
-  private final Serde<WV> windowValSerde;
-  private final Serde<M> msgSerde;
-
   private Trigger<M> earlyTrigger;
   private Trigger<M> lateTrigger;
   private AccumulationMode mode;
 
-  public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction,
-      Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde,
+  /**
+   * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the store configs, and deserialized
+   * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
+   */
+  private transient final Serde<WK> keySerde;
+  private transient final Serde<WV> windowValSerde;
+  private transient final Serde<M> msgSerde;
+
+  public WindowInternal(Trigger<M> defaultTrigger, SupplierFunction<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction,
+      MapFunction<M, WK> keyExtractor, MapFunction<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde,
       Serde<WV> windowValueSerde, Serde<M> msgSerde) {
     this.defaultTrigger = defaultTrigger;
     this.initializer = initializer;
@@ -121,7 +125,7 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
     return lateTrigger;
   }
 
-  public Supplier<WV> getInitializer() {
+  public SupplierFunction<WV> getInitializer() {
     return initializer;
   }
 
@@ -129,11 +133,11 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
     return foldLeftFunction;
   }
 
-  public Function<M, WK> getKeyExtractor() {
+  public MapFunction<M, WK> getKeyExtractor() {
     return keyExtractor;
   }
 
-  public Function<M, Long> getEventTimeExtractor() {
+  public MapFunction<M, Long> getEventTimeExtractor() {
     return eventTimeExtractor;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
index d70746c..d49518c 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
@@ -68,7 +68,7 @@ public class SerializableSerde<T extends Serializable> implements Serde<T> {
         ois = new ObjectInputStream(bis);
         return (T) ois.readObject();
       } catch (IOException | ClassNotFoundException e) {
-        throw new SamzaException("Error reading from input stream.");
+        throw new SamzaException("Error reading from input stream.", e);
       } finally {
         try {
           if (ois != null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index ce67d8d..cd86426 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.system;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -33,7 +34,7 @@ import java.util.Map;
  *
  * It is immutable by design.
  */
-public class StreamSpec {
+public class StreamSpec implements Serializable {
 
   private static final int DEFAULT_PARTITION_COUNT = 1;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
index 95cc266..e9ca9f7 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
@@ -60,7 +60,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
   public Partition getPartition() {
     return partition;
   }
-  
+
   public SystemStream getSystemStream() {
     return new SystemStream(system, stream);
   }
@@ -69,7 +69,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
   public int hashCode() {
     return hash;
   }
-  
+
   private int computeHashCode() {
     final int prime = 31;
     int result = super.hashCode();

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
index 68043f9..ba57c2f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.table;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -41,12 +42,17 @@ import org.apache.samza.serializers.KVSerde;
  * It is immutable by design.
  */
 @InterfaceStability.Unstable
-public class TableSpec {
+public class TableSpec implements Serializable {
 
   private final String id;
-  private final KVSerde serde;
   private final String tableProviderFactoryClassName;
-  private final Map<String, String> config = new HashMap<>();
+
+  /**
+   * The following fields are serialized by the ExecutionPlanner when generating the configs for a table, and deserialized
+   * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
+   */
+  private transient final KVSerde serde;
+  private transient final Map<String, String> config = new HashMap<>();
 
   /**
    * Default constructor

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
index 4184c9d..19cce6f 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -27,7 +27,7 @@ import static org.junit.Assert.assertEquals;
 public class TestWindowPane {
   @Test
   public void testConstructor() {
-    WindowPane<String, Integer> wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
+    WindowPane<String, Integer> wndOutput = new WindowPane<>(new WindowKey<>("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
     assertEquals(wndOutput.getKey().getKey(), "testMsg");
     assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index e2c122a..9d8bd5f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -34,7 +34,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.StreamSpec;
@@ -61,18 +61,18 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
+  public ExecutionPlan plan(OperatorSpecGraph specGraph) throws Exception {
     validateConfig();
 
     // create physical job graph based on stream graph
-    JobGraph jobGraph = createJobGraph(streamGraph);
+    JobGraph jobGraph = createJobGraph(specGraph);
 
     // fetch the external streams partition info
     updateExistingPartitions(jobGraph, streamManager);
 
     if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
       // figure out the partitions for internal streams
-      calculatePartitions(streamGraph, jobGraph);
+      calculatePartitions(jobGraph);
     }
 
     return jobGraph;
@@ -91,12 +91,12 @@ public class ExecutionPlanner {
   /**
    * Create the physical graph from StreamGraph
    */
-  /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
-    JobGraph jobGraph = new JobGraph(config);
-    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
-    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
+  /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
+    JobGraph jobGraph = new JobGraph(config, specGraph);
+    Set<StreamSpec> sourceStreams = new HashSet<>(specGraph.getInputOperators().keySet());
+    Set<StreamSpec> sinkStreams = new HashSet<>(specGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
-    Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet());
+    Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
     intStreams.retainAll(sinkStreams);
     sourceStreams.removeAll(intStreams);
     sinkStreams.removeAll(intStreams);
@@ -104,7 +104,7 @@ public class ExecutionPlanner {
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
     String jobId = config.get(JobConfig.JOB_ID(), "1");
-    JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);
+    JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
 
     // add sources
     sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
@@ -126,9 +126,9 @@ public class ExecutionPlanner {
   /**
    * Figure out the number of partitions of all streams
    */
-  /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
+  /* package private */ void calculatePartitions(JobGraph jobGraph) {
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(streamGraph, jobGraph);
+    calculateJoinInputPartitions(jobGraph);
 
     // calculate the partitions for the rest of intermediate streams
     calculateIntStreamPartitions(jobGraph, config);
@@ -172,7 +172,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
+  /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
@@ -182,7 +182,7 @@ public class ExecutionPlanner {
     // The visited set keeps track of the join specs that have been already inserted in the queue before
     Set<OperatorSpec> visited = new HashSet<>();
 
-    streamGraph.getInputOperators().entrySet().forEach(entry -> {
+    jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> {
         StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index abd3ce7..843db85 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
@@ -60,13 +60,15 @@ import org.slf4j.LoggerFactory;
   private final Set<TableSpec> tables = new HashSet<>();
   private final Config config;
   private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
+  private final OperatorSpecGraph specGraph;
 
   /**
    * The JobGraph is only constructed by the {@link ExecutionPlanner}.
    * @param config Config
    */
-  JobGraph(Config config) {
+  JobGraph(Config config, OperatorSpecGraph specGraph) {
     this.config = config;
+    this.specGraph = specGraph;
   }
 
   @Override
@@ -107,6 +109,10 @@ import org.slf4j.LoggerFactory;
     return new ApplicationConfig(config);
   }
 
+  public OperatorSpecGraph getSpecGraph() {
+    return specGraph;
+  }
+
   /**
    * Add a source stream to a {@link JobNode}
    * @param input source stream
@@ -152,11 +158,11 @@ import org.slf4j.LoggerFactory;
    * @param jobId id of the job
    * @return
    */
-  JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
+  JobNode getOrCreateJobNode(String jobName, String jobId) {
     String nodeId = JobNode.createId(jobName, jobId);
     JobNode node = nodes.get(nodeId);
     if (node == null) {
-      node = new JobNode(jobName, jobId, streamGraph, config);
+      node = new JobNode(jobName, jobId, specGraph, config);
       nodes.put(nodeId, node);
     }
     return node;

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 48d2219..298042b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -170,7 +170,7 @@ import org.codehaus.jackson.map.ObjectMapper;
   private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
     OperatorGraphJson opGraph = new OperatorGraphJson();
     opGraph.inputStreams = new ArrayList<>();
-    jobNode.getStreamGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
+    jobNode.getSpecGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
         StreamJson inputJson = new StreamJson();
         opGraph.inputStreams.add(inputJson);
         inputJson.streamId = streamSpec.getId();
@@ -181,7 +181,7 @@ import org.codehaus.jackson.map.ObjectMapper;
       });
 
     opGraph.outputStreams = new ArrayList<>();
-    jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec -> {
+    jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamSpec -> {
         StreamJson outputJson = new StreamJson();
         outputJson.streamId = streamSpec.getId();
         opGraph.outputStreams.add(outputJson);

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 8abd463..db44d9f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -39,7 +39,7 @@ import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -73,22 +73,22 @@ public class JobNode {
   private final String jobName;
   private final String jobId;
   private final String id;
-  private final StreamGraphImpl streamGraph;
+  private final OperatorSpecGraph specGraph;
   private final List<StreamEdge> inEdges = new ArrayList<>();
   private final List<StreamEdge> outEdges = new ArrayList<>();
   private final List<TableSpec> tables = new ArrayList<>();
   private final Config config;
 
-  JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config config) {
+  JobNode(String jobName, String jobId, OperatorSpecGraph specGraph, Config config) {
     this.jobName = jobName;
     this.jobId = jobId;
     this.id = createId(jobName, jobId);
-    this.streamGraph = streamGraph;
+    this.specGraph = specGraph;
     this.config = config;
   }
 
-  public StreamGraphImpl getStreamGraph() {
-    return streamGraph;
+  public OperatorSpecGraph getSpecGraph() {
+    return this.specGraph;
   }
 
   public  String getId() {
@@ -154,7 +154,7 @@ public class JobNode {
     }
 
     // set triggering interval if a window or join is defined
-    if (streamGraph.hasWindowOrJoins()) {
+    if (specGraph.hasWindowOrJoins()) {
       if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
         long triggerInterval = computeTriggerInterval();
         log.info("Using triggering interval: {} for jobName: {}", triggerInterval, jobName);
@@ -163,7 +163,7 @@ public class JobNode {
       }
     }
 
-    streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+    specGraph.getAllOperatorSpecs().forEach(opSpec -> {
         if (opSpec instanceof StatefulOperatorSpec) {
           ((StatefulOperatorSpec) opSpec).getStoreDescriptors()
               .forEach(sd -> configs.putAll(sd.getStorageConfigs()));
@@ -228,14 +228,14 @@ public class JobNode {
     // collect all key and msg serde instances for streams
     Map<String, Serde> streamKeySerdes = new HashMap<>();
     Map<String, Serde> streamMsgSerdes = new HashMap<>();
-    Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
+    Map<StreamSpec, InputOperatorSpec> inputOperators = specGraph.getInputOperators();
     inEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
         streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
         streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
       });
-    Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
+    Map<StreamSpec, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
     outEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
@@ -246,7 +246,7 @@ public class JobNode {
     // collect all key and msg serde instances for stores
     Map<String, Serde> storeKeySerdes = new HashMap<>();
     Map<String, Serde> storeMsgSerdes = new HashMap<>();
-    streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+    specGraph.getAllOperatorSpecs().forEach(opSpec -> {
         if (opSpec instanceof StatefulOperatorSpec) {
           ((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
               storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
@@ -320,8 +320,8 @@ public class JobNode {
    * Computes the triggering interval to use during the execution of this {@link JobNode}
    */
   private long computeTriggerInterval() {
-    // Obtain the operator specs from the streamGraph
-    Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs();
+    // Obtain the operator specs from the specGraph
+    Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs();
 
     // Filter out window operators, and obtain a list of their triggering interval values
     List<Long> windowTimerIntervals = operatorSpecs.stream()

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 1681f30..6922c76 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.function.Function;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -64,16 +63,16 @@ import org.apache.samza.table.TableSpec;
  */
 public class MessageStreamImpl<M> implements MessageStream<M> {
   /**
-   * The {@link StreamGraphImpl} that contains this {@link MessageStreamImpl}
+   * The {@link StreamGraphSpec} that contains this {@link MessageStreamImpl}
    */
-  private final StreamGraphImpl graph;
+  private final StreamGraphSpec graph;
 
   /**
    * The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
    */
   private final OperatorSpec operatorSpec;
 
-  public MessageStreamImpl(StreamGraphImpl graph, OperatorSpec<?, M> operatorSpec) {
+  public MessageStreamImpl(StreamGraphSpec graph, OperatorSpec<?, M> operatorSpec) {
     this.graph = graph;
     this.operatorSpec = operatorSpec;
   }
@@ -81,7 +80,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   @Override
   public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
     String opId = this.graph.getNextOpId(OpCode.MAP);
-    OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
+    StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
@@ -89,7 +88,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   @Override
   public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
     String opId = this.graph.getNextOpId(OpCode.FILTER);
-    OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
+    StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
@@ -97,7 +96,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
     String opId = this.graph.getNextOpId(OpCode.FLAT_MAP);
-    OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
+    StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
@@ -112,15 +111,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   @Override
   public void sendTo(OutputStream<M> outputStream) {
     String opId = this.graph.getNextOpId(OpCode.SEND_TO);
-    OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl<M>) outputStream, opId);
+    OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
+        (OutputStreamImpl<M>) outputStream, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
     String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId);
-    OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec(
-        (WindowInternal<M, K, WV>) window, opId);
+    OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
     return new MessageStreamImpl<>(this.graph, op);
   }
@@ -131,24 +130,24 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
       Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
       Duration ttl, String userDefinedId) {
     if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
-    OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
     String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
-    JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
-        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
-            keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId);
-
-    this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
-    otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
+    OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
+    JoinOperatorSpec<K, M, OM, JM> op =
+        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde,
+            messageSerde, otherMessageSerde, ttl.toMillis(), opId);
+    this.operatorSpec.registerNextOperatorSpec(op);
+    otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op);
 
-    return new MessageStreamImpl<>(this.graph, joinOpSpec);
+    return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
       StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
+    String opId = this.graph.getNextOpId(OpCode.JOIN);
     TableSpec tableSpec = ((TableImpl) table).getTableSpec();
     StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
-        tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, this.graph.getNextOpId(OpCode.JOIN));
+        tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
     this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
     return new MessageStreamImpl<>(this.graph, joinOpSpec);
   }
@@ -157,46 +156,38 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
     if (otherStreams.isEmpty()) return this;
     String opId = this.graph.getNextOpId(OpCode.MERGE);
-    StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(opId);
-    this.operatorSpec.registerNextOperatorSpec(opSpec);
-    otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec));
-    return new MessageStreamImpl<>(this.graph, opSpec);
+    StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId);
+    this.operatorSpec.registerNextOperatorSpec(op);
+    otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op));
+    return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
-  public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
+  public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+      MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
     String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
     IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde);
     if (!intermediateStream.isKeyed()) {
       // this can only happen when the default serde partitionBy variant is being used
       throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
     }
-    PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec =
-        OperatorSpecs.createPartitionByOperatorSpec(
-            intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
+    PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
+        intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
     this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
     return intermediateStream;
   }
 
   @Override
-  public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
-      Function<? super M, ? extends V> valueExtractor, String userDefinedId) {
+  public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+      MapFunction<? super M, ? extends V> valueExtractor, String userDefinedId) {
     return partitionBy(keyExtractor, valueExtractor, null, userDefinedId);
   }
 
-  /**
-   * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
-   * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
-   */
-  protected OperatorSpec<?, M> getOperatorSpec() {
-    return this.operatorSpec;
-  }
-
   @Override
   public <K, V> void sendTo(Table<KV<K, V>> table) {
-    SendToTableOperatorSpec<K, V> op = OperatorSpecs.createSendToTableOperatorSpec(
-        this.operatorSpec, ((TableImpl) table).getTableSpec(), this.graph.getNextOpId(OpCode.SEND_TO));
+    String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+    SendToTableOperatorSpec<K, V> op =
+        OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId);
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 
@@ -215,4 +206,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
     return broadcast(null, userDefinedId);
   }
 
+  /**
+   * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
+   * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
+   */
+  protected OperatorSpec<?, M> getOperatorSpec() {
+    return this.operatorSpec;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
new file mode 100644
index 0000000..ba51c7c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Defines the serialized format of {@link StreamGraphSpec}. This class encapsulates all getter methods to get the {@link OperatorSpec}
+ * initialized in the {@link StreamGraphSpec} and constructsthe corresponding serialized instances of {@link OperatorSpec}.
+ * The {@link StreamGraphSpec} and {@link OperatorSpec} instances included in this class are considered as immutable and read-only.
+ * The instance of {@link OperatorSpecGraph} should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}.
+ */
+public class OperatorSpecGraph implements Serializable {
+  // We use a LHM for deterministic order in initializing and closing operators.
+  private final Map<StreamSpec, InputOperatorSpec> inputOperators;
+  private final Map<StreamSpec, OutputStreamImpl> outputStreams;
+  private final Map<TableSpec, TableImpl> tables;
+  private final Set<OperatorSpec> allOpSpecs;
+  private final boolean hasWindowOrJoins;
+
+  // The following objects are transient since they are recreateable.
+  private transient final SerializableSerde<OperatorSpecGraph> opSpecGraphSerde = new SerializableSerde<>();
+  private transient final byte[] serializedOpSpecGraph;
+
+  OperatorSpecGraph(StreamGraphSpec graphSpec) {
+    this.inputOperators = graphSpec.getInputOperators();
+    this.outputStreams = graphSpec.getOutputStreams();
+    this.tables = graphSpec.getTables();
+    this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
+    this.hasWindowOrJoins = checkWindowOrJoins();
+    this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
+  }
+
+  public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+    return inputOperators;
+  }
+
+  public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+    return outputStreams;
+  }
+
+  public Map<TableSpec, TableImpl> getTables() {
+    return tables;
+  }
+
+  /**
+   * Get all {@link OperatorSpec}s available in this {@link StreamGraphSpec}
+   *
+   * @return all available {@link OperatorSpec}s
+   */
+  public Collection<OperatorSpec> getAllOperatorSpecs() {
+    return allOpSpecs;
+  }
+
+  /**
+   * Returns <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+   *
+   * @return  <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+   */
+  public boolean hasWindowOrJoins() {
+    return hasWindowOrJoins;
+  }
+
+  /**
+   * Returns a deserialized {@link OperatorSpecGraph} as a copy from this instance of {@link OperatorSpecGraph}
+   * This is used to create per-task instance of {@link OperatorSpecGraph} when instantiating task instances.
+   *
+   * @return a copy of this {@link OperatorSpecGraph} object via deserialization
+   */
+  public OperatorSpecGraph clone() {
+    if (opSpecGraphSerde == null) {
+      throw new IllegalStateException("Cannot clone from an already deserialized OperatorSpecGraph.");
+    }
+    return opSpecGraphSerde.fromBytes(serializedOpSpecGraph);
+  }
+
+  private HashSet<OperatorSpec> findAllOperatorSpecs() {
+    Collection<InputOperatorSpec> inputOperatorSpecs = this.inputOperators.values();
+    HashSet<OperatorSpec> operatorSpecs = new HashSet<>();
+    for (InputOperatorSpec inputOperatorSpec : inputOperatorSpecs) {
+      operatorSpecs.add(inputOperatorSpec);
+      doGetOperatorSpecs(inputOperatorSpec, operatorSpecs);
+    }
+    return operatorSpecs;
+  }
+
+  private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) {
+    Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
+    for (OperatorSpec registeredOperatorSpec : registeredOperatorSpecs) {
+      specs.add(registeredOperatorSpec);
+      doGetOperatorSpecs(registeredOperatorSpec, specs);
+    }
+  }
+
+  private boolean checkWindowOrJoins() {
+    Set<OperatorSpec> windowOrJoinSpecs = allOpSpecs.stream()
+        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN)
+        .collect(Collectors.toSet());
+
+    return windowOrJoinSpecs.size() != 0;
+  }
+
+}