You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/09 10:22:47 UTC

[4/4] samza git commit: SAMZA-1073: top-level fluent API `

SAMZA-1073: top-level fluent API
`


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373048aa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373048aa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373048aa

Branch: refs/heads/samza-fluent-api-v1
Commit: 373048aa0a68221af5f6b5589bbe161c972b11a9
Parents: 38b1dc3
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Thu Feb 9 01:56:10 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Feb 9 01:57:05 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/operators/ContextManager.java  |  47 ++++
 .../apache/samza/operators/MessageStream.java   |  90 ++++---
 .../apache/samza/operators/OutputStream.java    |  41 +++
 .../org/apache/samza/operators/StreamGraph.java | 118 +++++++++
 .../samza/operators/StreamGraphFactory.java     |  38 +++
 .../samza/operators/StreamOperatorTask.java     |  51 ----
 .../org/apache/samza/operators/StreamSpec.java  |  46 ++++
 .../data/IncomingSystemMessageEnvelope.java     |  63 -----
 .../operators/data/InputMessageEnvelope.java    |  63 +++++
 .../samza/operators/data/MessageEnvelope.java   |   2 +-
 .../operators/functions/FilterFunction.java     |  23 +-
 .../operators/functions/FlatMapFunction.java    |  25 +-
 .../samza/operators/functions/InitFunction.java |  38 +++
 .../samza/operators/functions/JoinFunction.java |  48 +++-
 .../functions/KeyValueJoinFunction.java         |  44 ++++
 .../samza/operators/functions/MapFunction.java  |  25 +-
 .../samza/operators/functions/SinkFunction.java |  23 +-
 .../samza/operators/triggers/AnyTrigger.java    |   3 +-
 .../samza/operators/triggers/CountTrigger.java  |   4 +-
 .../operators/triggers/RepeatingTrigger.java    |   4 +-
 .../triggers/TimeSinceFirstMessageTrigger.java  |   3 +-
 .../triggers/TimeSinceLastMessageTrigger.java   |   4 +-
 .../samza/operators/triggers/TimeTrigger.java   |   4 +-
 .../samza/operators/triggers/Trigger.java       |   7 +-
 .../samza/operators/triggers/Triggers.java      |  41 +--
 .../apache/samza/operators/windows/Window.java  |  20 +-
 .../samza/operators/windows/WindowKey.java      |  19 +-
 .../samza/operators/windows/WindowPane.java     |   9 +-
 .../apache/samza/operators/windows/Windows.java | 136 +++++-----
 .../windows/internal/WindowInternal.java        |  14 +-
 .../samza/system/ExecutionEnvironment.java      |  73 ++++++
 .../java/org/apache/samza/task/TaskContext.java |  10 +
 .../data/TestIncomingSystemMessage.java         |   2 +-
 .../operators/windows/TestWindowOutput.java     |  35 ---
 .../samza/operators/windows/TestWindowPane.java |  33 +++
 .../samza/operators/MessageStreamImpl.java      | 151 +++++++----
 .../apache/samza/operators/StreamGraphImpl.java | 260 +++++++++++++++++++
 .../operators/StreamOperatorAdaptorTask.java    | 105 --------
 .../functions/PartialJoinFunction.java          |  65 +++++
 .../samza/operators/impl/OperatorGraph.java     | 164 ++++++++++++
 .../samza/operators/impl/OperatorImpl.java      |  22 +-
 .../samza/operators/impl/OperatorImpls.java     | 124 ---------
 .../operators/impl/PartialJoinOperatorImpl.java |  15 +-
 .../samza/operators/impl/RootOperatorImpl.java  |   7 +-
 .../impl/SessionWindowOperatorImpl.java         |  52 ++++
 .../samza/operators/impl/SinkOperatorImpl.java  |   7 +-
 .../operators/impl/StreamOperatorImpl.java      |  14 +-
 .../operators/impl/WindowOperatorImpl.java      |  11 +-
 .../samza/operators/spec/OperatorSpec.java      |  39 ++-
 .../samza/operators/spec/OperatorSpecs.java     | 161 +++++++++---
 .../operators/spec/PartialJoinOperatorSpec.java |  58 +++--
 .../samza/operators/spec/SinkOperatorSpec.java  |  70 ++++-
 .../operators/spec/StreamOperatorSpec.java      |  58 +++--
 .../operators/spec/WindowOperatorSpec.java      |  41 ++-
 .../samza/operators/spec/WindowState.java       |  16 +-
 .../system/SingleJobExecutionEnvironment.java   |  37 +++
 .../system/StandaloneExecutionEnvironment.java  |  41 +++
 .../apache/samza/task/StreamOperatorTask.java   | 108 ++++++++
 .../apache/samza/example/BroadcastGraph.java    | 121 +++++++++
 .../org/apache/samza/example/JoinGraph.java     | 118 +++++++++
 .../samza/example/KeyValueStoreExample.java     | 184 +++++++++++++
 .../samza/example/NoContextStreamExample.java   | 156 +++++++++++
 .../samza/example/OrderShipmentJoinExample.java | 190 ++++++++++++++
 .../samza/example/PageViewCounterExample.java   | 133 ++++++++++
 .../samza/example/RepartitionExample.java       | 145 +++++++++++
 .../samza/example/TestFluentStreamTasks.java    |  99 +++++++
 .../org/apache/samza/example/WindowGraph.java   |  87 +++++++
 .../apache/samza/operators/BroadcastTask.java   |  96 -------
 .../org/apache/samza/operators/JoinTask.java    |  77 ------
 .../operators/TestFluentStreamAdaptorTask.java  |  85 ------
 .../samza/operators/TestFluentStreamTasks.java  | 112 --------
 .../samza/operators/TestMessageStreamImpl.java  |  52 ++--
 .../operators/TestMessageStreamImplUtil.java    |  26 ++
 .../org/apache/samza/operators/WindowTask.java  |  63 -----
 .../samza/operators/impl/TestOperatorImpls.java |  94 +++++--
 .../operators/impl/TestSinkOperatorImpl.java    |  11 +-
 .../operators/impl/TestStreamOperatorImpl.java  |  20 +-
 .../samza/operators/spec/TestOperatorSpecs.java |  65 +++--
 78 files changed, 3455 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
new file mode 100644
index 0000000..c3b1cf3
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Interface class defining methods to initialize and finalize the context used by the transformation functions.
+ */
+@InterfaceStability.Unstable
+public interface ContextManager {
+  /**
+   * The initialization method to create shared context for the whole task in Samza. Default to NO-OP
+   *
+   * @param config  the configuration object for the task
+   * @param context  the {@link TaskContext} object
+   * @return  User-defined task-wide context object
+   */
+  default TaskContext initTaskContext(Config config, TaskContext context) {
+    return context;
+  }
+
+  /**
+   * The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP.
+   *
+   */
+  default void finalizeTaskContext() { }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 6a2f95b..87a9fd3 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -19,7 +19,6 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -29,73 +28,93 @@ import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 
 import java.util.Collection;
+import java.util.function.Function;
 
 
 /**
- * Represents a stream of {@link MessageEnvelope}s.
+ * Represents a stream of messages.
  * <p>
  * A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
  *
- * @param <M>  type of {@link MessageEnvelope}s in this stream
+ * @param <M>  type of messages in this stream
  */
 @InterfaceStability.Unstable
-public interface MessageStream<M extends MessageEnvelope> {
+public interface MessageStream<M> {
 
   /**
-   * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
+   * Applies the provided 1:1 {@link Function} to messages in this {@link MessageStream} and returns the
    * transformed {@link MessageStream}.
    *
-   * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope}
-   * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
+   * @param mapFn the function to transform a message to another message
+   * @param <TM> the type of messages in the transformed {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
-  <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);
+  <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn);
 
   /**
-   * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream}
-   * to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
+   * Applies the provided 1:n {@link Function} to transform a message in this {@link MessageStream}
+   * to n messages in the transformed {@link MessageStream}
    *
-   * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s
-   * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
+   * @param flatMapFn the function to transform a message to zero or more messages
+   * @param <TM> the type of messages in the transformed {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
-  <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
+  <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
 
   /**
-   * Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
+   * Applies the provided {@link Function} to messages in this {@link MessageStream} and returns the
    * transformed {@link MessageStream}.
    * <p>
-   * The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream}
+   * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
    * should be retained in the transformed {@link MessageStream}.
    *
-   * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream}
+   * @param filterFn the predicate to filter messages from this {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
   MessageStream<M> filter(FilterFunction<M> filterFn);
 
   /**
-   * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output
-   * {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}.
+   * Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
    *
-   * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems
+   * NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
+   *
+   * @param sinkFn  the function to send messages in this stream to output
    */
   void sink(SinkFunction<M> sinkFn);
 
   /**
-   * Groups and processes the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window}
+   * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
+   *
+   * NOTE: the {@code stream} has to be a {@link MessageStream}.
+   *
+   * @param stream  the output {@link MessageStream}
+   */
+  void sendTo(OutputStream<M> stream);
+
+  /**
+   * Allows sending messages to an intermediate {@link MessageStream}.
+   *
+   * NOTE: the {@code stream} has to be a {@link MessageStream}.
+   *
+   * @param stream  the intermediate {@link MessageStream} to send the message to
+   * @return  the intermediate {@link MessageStream} to consume the messages sent
+   */
+  MessageStream<M> sendThrough(OutputStream<M> stream);
+
+  /**
+   * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
    * (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
    * {@link WindowPane}s.
    * <p>
    * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
    *
-   * @param window the window to group and process {@link MessageEnvelope}s from this {@link MessageStream}
-   * @param <K> the type of key in the {@link MessageEnvelope} in this {@link MessageStream}. If a key is specified,
+   * @param window the window to group and process messages from this {@link MessageStream}
+   * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
    *            panes are emitted per-key.
    * @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
-   * @param <WM> the type of {@link WindowPane} in the transformed {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
-  <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, WV, WM> window);
+  <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
 
   /**
    * Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
@@ -103,23 +122,32 @@ public interface MessageStream<M extends MessageEnvelope> {
    * We currently only support 2-way joins.
    *
    * @param otherStream the other {@link MessageStream} to be joined with
-   * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream}
+   * @param joinFn the function to join messages from this and the other {@link MessageStream}
    * @param <K> the type of join key
-   * @param <OM> the type of {@link MessageEnvelope}s in the other stream
-   * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn}
+   * @param <OM> the type of messages in the other stream
+   * @param <RM> the type of messages resulting from the {@code joinFn}
    * @return the joined {@link MessageStream}
    */
-  <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream,
-      JoinFunction<M, OM, RM> joinFn);
+  <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn);
 
   /**
    * Merge all {@code otherStreams} with this {@link MessageStream}.
    * <p>
-   * The merging streams must have the same {@link MessageEnvelope} type {@code M}.
+   * The merging streams must have the same messages of type {@code M}.
    *
    * @param otherStreams  other {@link MessageStream}s to be merged with this {@link MessageStream}
    * @return  the merged {@link MessageStream}
    */
   MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
-  
+
+  /**
+   * Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
+   *
+   * Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
+   *
+   * @param parKeyExtractor  a {@link Function} that extract the partition key from a message in this {@link MessageStream}
+   * @param <K>  the type of partition key
+   * @return  a {@link MessageStream} object after the re-partition
+   */
+  <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
new file mode 100644
index 0000000..179f0e7
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.SinkFunction;
+
+
+/**
+ * The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
+ *
+ * @param <M>  The type of message to be send to this output stream
+ */
+@InterfaceStability.Unstable
+public interface OutputStream<M> {
+
+  /**
+   * Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
+   * via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
+   * Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
+   *
+   * @return  The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
+   */
+  SinkFunction<M> getSinkFunction();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
new file mode 100644
index 0000000..9e6644b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.serializers.Serde;
+
+import java.util.Map;
+
+
+/**
+ * Job-level programming interface to create an operator DAG and run in various different runtime environments.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraph {
+  /**
+   * Method to add an input {@link MessageStream} from the system
+   *
+   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
+   * @param keySerde  the serde used to serialize/deserialize the message key from the input {@link MessageStream}
+   * @param msgSerde  the serde used to serialize/deserialize the message body from the input {@link MessageStream}
+   * @param <K>  the type of key in the input message
+   * @param <V>  the type of message in the input message
+   * @param <M>  the type of {@link MessageEnvelope} in the input {@link MessageStream}
+   * @return   the input {@link MessageStream} object
+   */
+  <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+  /**
+   * Method to add an output {@link MessageStream} from the system
+   *
+   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
+   * @param keySerde  the serde used to serialize/deserialize the message key from the output {@link MessageStream}
+   * @param msgSerde  the serde used to serialize/deserialize the message body from the output {@link MessageStream}
+   * @param <K>  the type of key in the output message
+   * @param <V>  the type of message in the output message
+   * @param <M>  the type of {@link MessageEnvelope} in the output {@link MessageStream}
+   * @return   the output {@link MessageStream} object
+   */
+  <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+  /**
+   * Method to add an intermediate {@link MessageStream} from the system
+   *
+   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
+   * @param keySerde  the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
+   * @param msgSerde  the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
+   * @param <K>  the type of key in the intermediate message
+   * @param <V>  the type of message in the intermediate message
+   * @param <M>  the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
+   * @return   the intermediate {@link MessageStream} object
+   */
+  <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+  /**
+   * Method to get the input {@link MessageStream}s
+   *
+   * @return the input {@link MessageStream}
+   */
+  Map<StreamSpec, MessageStream> getInStreams();
+
+  /**
+   * Method to get the {@link OutputStream}s
+   *
+   * @return  the map of all {@link OutputStream}s
+   */
+  Map<StreamSpec, OutputStream> getOutStreams();
+
+  /**
+   * Method to set the {@link ContextManager} for this {@link StreamGraph}
+   *
+   * @param manager  the {@link ContextManager} object
+   * @return  this {@link StreamGraph} object
+   */
+  StreamGraph withContextManager(ContextManager manager);
+
+  String GRAPH_CONFIG = "job.stream.graph.impl.class";
+  String DEFAULT_GRAPH_IMPL_CLASS = "org.apache.samza.operators.StreamGraphImpl";
+
+  /**
+   * Static method to instantiate the implementation class of {@link StreamGraph}.
+   *
+   * @param config  the {@link Config} object for this job
+   * @return  the {@link StreamGraph} object created
+   */
+  static StreamGraph fromConfig(Config config) {
+
+    try {
+      if (StreamGraph.class.isAssignableFrom(Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)))) {
+        return (StreamGraph) Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)).newInstance();
+      }
+    } catch (Exception e) {
+      throw new ConfigException(String.format("Problem in loading StreamGraphImpl class %s", config.get(GRAPH_CONFIG)), e);
+    }
+    throw new ConfigException(String.format(
+        "Class %s does not implement interface StreamGraphBuilder properly",
+        config.get(GRAPH_CONFIG)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
new file mode 100644
index 0000000..c292363
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraphFactory {
+  /**
+   * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
+   * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
+   *
+   * @param config  the {@link Config} of the application
+   * @return  the {@link StreamGraph} object which contains user-defined processing logic of the application
+   */
+  StreamGraph create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
deleted file mode 100644
index 16cf27a..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Map;
-
-
-/**
- * A {@link StreamOperatorTask} is the basic interface to implement for processing {@link MessageStream}s.
- * Implementations can describe the transformation steps for each {@link MessageStream} in the
- * {@link #transform} method using {@link MessageStream} APIs.
- * <p>
- * Implementations may be augmented by implementing {@link org.apache.samza.task.InitableTask},
- * {@link org.apache.samza.task.WindowableTask} and {@link org.apache.samza.task.ClosableTask} interfaces,
- * but should not implement {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask}
- * interfaces.
- */
-@InterfaceStability.Unstable
-public interface StreamOperatorTask {
-
-  /**
-   * Describe the transformation steps for each {@link MessageStream}s for this task using the
-   * {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one {@link SystemStreamPartition}
-   * in the input system.
-   *
-   * @param messageStreams the {@link MessageStream}s that receive {@link IncomingSystemMessageEnvelope}s
-   *                       from their corresponding {@link org.apache.samza.system.SystemStreamPartition}
-   */
-  void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
new file mode 100644
index 0000000..c8a5e8d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.SystemStream;
+
+import java.util.Properties;
+
+
+/**
+ * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin}
+ * to create a {@link SystemStream}
+ */
+@InterfaceStability.Unstable
+public interface StreamSpec {
+  /**
+   * Get the {@link SystemStream}
+   *
+   * @return  {@link SystemStream} object
+   */
+  SystemStream getSystemStream();
+
+  /**
+   * Get the physical properties of the {@link SystemStream}
+   *
+   * @return  the properties of this stream
+   */
+  Properties getProperties();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
deleted file mode 100644
index a65809c..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
- * and its {@link Offset} within the {@link SystemStreamPartition}.
- * <p>
- * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
- */
-public class IncomingSystemMessageEnvelope implements MessageEnvelope<Object, Object> {
-
-  private final IncomingMessageEnvelope ime;
-
-  /**
-   * Creates an {@code IncomingSystemMessageEnvelope} from the {@link IncomingMessageEnvelope}.
-   *
-   * @param ime  the {@link IncomingMessageEnvelope} from the input system.
-   */
-  public IncomingSystemMessageEnvelope(IncomingMessageEnvelope ime) {
-    this.ime = ime;
-  }
-
-  @Override
-  public Object getKey() {
-    return this.ime.getKey();
-  }
-
-  @Override
-  public Object getMessage() {
-    return this.ime.getMessage();
-  }
-
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
-    // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.ime.getOffset());
-  }
-
-  public SystemStreamPartition getSystemStreamPartition() {
-    return this.ime.getSystemStreamPartition();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
new file mode 100644
index 0000000..306145b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
+ * and its {@link Offset} within the {@link SystemStreamPartition}.
+ * <p>
+ * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
+ */
+public class InputMessageEnvelope implements MessageEnvelope<Object, Object> {
+
+  private final IncomingMessageEnvelope ime;
+
+  /**
+   * Creates an {@code InputMessageEnvelope} from the {@link IncomingMessageEnvelope}.
+   *
+   * @param ime  the {@link IncomingMessageEnvelope} from the input system.
+   */
+  public InputMessageEnvelope(IncomingMessageEnvelope ime) {
+    this.ime = ime;
+  }
+
+  @Override
+  public Object getKey() {
+    return this.ime.getKey();
+  }
+
+  @Override
+  public Object getMessage() {
+    return this.ime.getMessage();
+  }
+
+  public Offset getOffset() {
+    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+    // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
+    return new LongOffset(this.ime.getOffset());
+  }
+
+  public SystemStreamPartition getSystemStreamPartition() {
+    return this.ime.getSystemStreamPartition();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
index ad64231..703a44c 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
@@ -23,7 +23,7 @@ import org.apache.samza.annotation.InterfaceStability;
 
 
 /**
- * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s.
+ * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s
  */
 @InterfaceStability.Unstable
 public interface MessageEnvelope<K, M> {

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index e611cd0..73c5c9d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -19,22 +19,29 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
 
 
 /**
- * A function that specifies whether a {@link MessageEnvelope} should be retained for further processing or filtered out.
- * @param <M>  type of the input {@link MessageEnvelope}
+ * A function that specifies whether a message should be retained for further processing or filtered out.
+ * @param <M>  type of the input message
  */
 @InterfaceStability.Unstable
-@FunctionalInterface
-public interface FilterFunction<M extends MessageEnvelope> {
+public interface FilterFunction<M> extends InitFunction {
 
   /**
-   * Returns a boolean indicating whether this {@link MessageEnvelope} should be retained or filtered out.
-   * @param message  the {@link MessageEnvelope} to be checked
-   * @return  true if {@link MessageEnvelope} should be retained
+   * Returns a boolean indicating whether this message should be retained or filtered out.
+   * @param message  the input message to be checked
+   * @return  true if {@code message} should be retained
    */
   boolean apply(M message);
 
+  /**
+   * Init method to initialize the context for this {@link FilterFunction}. The default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index dbc0bd9..f8458f2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -19,26 +19,33 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
 
 import java.util.Collection;
 
 
 /**
- * A function that transforms a {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s,
+ * A function that transforms an input message into a collection of 0 or more messages,
  * possibly of a different type.
- * @param <M>  type of the input {@link MessageEnvelope}
- * @param <OM>  type of the transformed {@link MessageEnvelope}s
+ * @param <M>  type of the input message
+ * @param <OM>  type of the transformed messages
  */
 @InterfaceStability.Unstable
-@FunctionalInterface
-public interface FlatMapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> {
+public interface FlatMapFunction<M, OM>  extends InitFunction {
 
   /**
-   * Transforms the provided {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s.
-   * @param message  the {@link MessageEnvelope} to be transformed
-   * @return  a collection of 0 or more transformed {@link MessageEnvelope}s
+   * Transforms the provided message into a collection of 0 or more messages.
+   * @param message  the input message to be transformed
+   * @return  a collection of 0 or more transformed messages
    */
   Collection<OM> apply(M message);
 
+  /**
+   * Init method to initialize the context for this {@link FlatMapFunction}. The default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { };
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
new file mode 100644
index 0000000..eec56df
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * interface defined to initalize the context of message transformation functions
+ */
+@InterfaceStability.Unstable
+public interface InitFunction {
+  /**
+   * Interface method to initialize the context for a specific message transformation function.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  void init(Config config, TaskContext context);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index 8cb1fce..afc92ee 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -19,26 +19,50 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
 
 
 /**
- * A function that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
- * a joined {@link MessageEnvelope}.
- * @param <M>  type of the input {@link MessageEnvelope}
- * @param <JM>  type of the {@link MessageEnvelope} to join with
- * @param <RM>  type of the joined {@link MessageEnvelope}
+ * A function that joins messages from two {@link org.apache.samza.operators.MessageStream}s and produces
+ * a joined message.
+ * @param <K>  type of the join key
+ * @param <M>  type of the input message
+ * @param <JM>  type of the message to join with
+ * @param <RM>  type of the joined message
  */
 @InterfaceStability.Unstable
-@FunctionalInterface
-public interface JoinFunction<M extends MessageEnvelope, JM extends MessageEnvelope, RM extends MessageEnvelope> {
+public interface JoinFunction<K, M, JM, RM>  extends InitFunction {
 
   /**
-   * Join the provided {@link MessageEnvelope}s and produces the joined {@link MessageEnvelope}.
-   * @param message  the input {@link MessageEnvelope}
-   * @param otherMessage  the {@link MessageEnvelope} to join with
-   * @return  the joined {@link MessageEnvelope}
+   * Join the provided input messages and produces the joined messages.
+   * @param message  the input message
+   * @param otherMessage  the message to join with
+   * @return  the joined message
    */
   RM apply(M message, JM otherMessage);
 
+  /**
+   * Method to get the join key in the messages from the first input stream
+   *
+   * @param message  the input message from the first input stream
+   * @return  the join key
+   */
+  K getFirstKey(M message);
+
+  /**
+   * Method to get the join key in the messages from the second input stream
+   *
+   * @param message  the input message from the second input stream
+   * @return  the join key
+   */
+  K getSecondKey(JM message);
+
+  /**
+   * Init method to initialize the context for this {@link JoinFunction}. The default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
new file mode 100644
index 0000000..b651b3d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * A specific {@link JoinFunction} that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
+ * a joined message.
+ *
+ * @param <K>  type of the join key
+ * @param <M>  type of the input {@link MessageEnvelope}
+ * @param <JM>  type of the {@link MessageEnvelope} to join with
+ * @param <RM>  type of the joined message
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface KeyValueJoinFunction<K, M extends MessageEnvelope<K, ?>, JM extends MessageEnvelope<K, ?>, RM> extends JoinFunction<K, M, JM, RM> {
+
+  default K getFirstKey(M message) {
+    return message.getKey();
+  }
+
+  default K getSecondKey(JM message) {
+    return message.getKey();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index 04919a7..a051914 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -19,23 +19,30 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
 
 
 /**
- * A function that transforms a {@link MessageEnvelope} into another {@link MessageEnvelope}, possibly of a different type.
- * @param <M>  type of the input {@link MessageEnvelope}
- * @param <OM>  type of the transformed {@link MessageEnvelope}
+ * A function that transforms an input message into another message, possibly of a different type.
+ * @param <M>  type of the input message
+ * @param <OM>  type of the transformed message
  */
 @InterfaceStability.Unstable
-@FunctionalInterface
-public interface MapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> {
+public interface MapFunction<M, OM>  extends InitFunction {
 
   /**
-   * Transforms the provided {@link MessageEnvelope} into another {@link MessageEnvelope}
-   * @param message  the {@link MessageEnvelope} to be transformed
-   * @return  the transformed {@link MessageEnvelope}
+   * Transforms the provided message into another message
+   * @param message  the input message to be transformed
+   * @return  the transformed message
    */
   OM apply(M message);
 
+  /**
+   * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 505da92..1050771 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -19,28 +19,35 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * A function that allows sending a {@link MessageEnvelope} to an output system.
- * @param <M>  type of the input {@link MessageEnvelope}
+ * A function that allows sending a message to an output system.
+ * @param <M>  type of the input message
  */
 @InterfaceStability.Unstable
-@FunctionalInterface
-public interface SinkFunction<M extends MessageEnvelope> {
+public interface SinkFunction<M>  extends InitFunction {
 
   /**
-   * Allows sending the provided {@link MessageEnvelope} to an output {@link org.apache.samza.system.SystemStream} using
+   * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using
    * the provided {@link MessageCollector}. Also provides access to the {@link TaskCoordinator} to request commits
    * or shut the container down.
    *
-   * @param message  the {@link MessageEnvelope} to be sent to an output {@link org.apache.samza.system.SystemStream}
-   * @param messageCollector  the {@link MessageCollector} to use to send the {@link MessageEnvelope}
+   * @param message  the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
+   * @param messageCollector  the {@link MessageCollector} to use to send the {@link org.apache.samza.operators.data.MessageEnvelope}
    * @param taskCoordinator  the {@link TaskCoordinator} to request commits or shutdown
    */
   void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);
 
+  /**
+   * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index 3ca4e9a..6e134df 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -18,13 +18,12 @@
 */
 package org.apache.samza.operators.triggers;
 
-import org.apache.samza.operators.data.MessageEnvelope;
 import java.util.List;
 
 /**
  * A {@link Trigger} fires as soon as any of its individual triggers has fired.
  */
-public class AnyTrigger<M extends MessageEnvelope> implements Trigger {
+public class AnyTrigger<M> implements Trigger {
 
   private final List<Trigger> triggers;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
index ba14928..1cf930c 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
@@ -18,13 +18,11 @@
  */
 package org.apache.samza.operators.triggers;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-
 /**
  * A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane}
  * reaches the specified count.
  */
-public class CountTrigger<M extends MessageEnvelope> implements Trigger {
+public class CountTrigger<M> implements Trigger {
 
   private final long count;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
index ae9564d..7f78eb8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -18,12 +18,10 @@
  */
 package org.apache.samza.operators.triggers;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-
 /**
  * A {@link Trigger} that repeats its underlying trigger forever.
  */
-class RepeatingTrigger<M extends MessageEnvelope> implements Trigger<M> {
+class RepeatingTrigger<M> implements Trigger<M> {
 
   private final Trigger<M> trigger;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
index 13fc3cd..4de60a2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.operators.triggers;
 
-import org.apache.samza.operators.data.MessageEnvelope;
 
 import java.time.Duration;
 
@@ -27,7 +26,7 @@ import java.time.Duration;
  * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
  * the window pane.
  */
-public class TimeSinceFirstMessageTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeSinceFirstMessageTrigger<M> implements Trigger {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
index 0150d86..6b09625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -18,14 +18,12 @@
  */
 package org.apache.samza.operators.triggers;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-
 import java.time.Duration;
 
 /*
  * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
  */
-public class TimeSinceLastMessageTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeSinceLastMessageTrigger<M> implements Trigger {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
index ed7fef7..c5875aa 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -18,14 +18,12 @@
  */
 package org.apache.samza.operators.triggers;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-
 import java.time.Duration;
 
 /*
  * A {@link Trigger} that fires after the specified duration in processing time.
  */
-public class TimeTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeTrigger<M> implements Trigger {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
index 6dc4f43..be0a877 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
@@ -20,15 +20,16 @@
 package org.apache.samza.operators.triggers;
 
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.annotation.InterfaceStability;
 
 /**
  * Marker interface for all triggers. The firing of a trigger indicates the completion of a window pane.
  *
  * <p> Use the {@link Triggers} APIs to create a {@link Trigger}.
  *
- * @param <M> the type of the incoming {@link MessageEnvelope}
+ * @param <M> the type of the incoming message
  */
-public interface Trigger<M extends MessageEnvelope> {
+@InterfaceStability.Unstable
+public interface Trigger<M> {
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
index f27cfd8..97fb7b7 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
@@ -19,7 +19,6 @@
 package org.apache.samza.operators.triggers;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -35,61 +34,63 @@ import java.util.List;
  * <pre> {@code
  *   MessageStream<> windowedStream = stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS))
  *     .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS))))))
- *     .accumulateFiredPanes());
+ *     .setAccumulationMode(AccumulationMode.ACCUMULATING));
  * }</pre>
  *
- * @param <M> the type of input {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream}
  */
 @InterfaceStability.Unstable
-public final class Triggers<M extends MessageEnvelope> {
+public final class Triggers {
 
   private Triggers() { }
 
   /**
-   * Creates a {@link Trigger} that fires when the number of {@link MessageEnvelope}s in the pane
+   * Creates a {@link Trigger} that fires when the number of messages in the pane
    * reaches the specified count.
    *
-   * @param count the number of {@link MessageEnvelope}s to fire the trigger after
+   * @param count the number of messages to fire the trigger after
+   * @param <M> the type of input message in the window
    * @return the created trigger
    */
-  public static Trigger count(long count) {
-    return new CountTrigger(count);
+  public static <M> Trigger<M> count(long count) {
+    return new CountTrigger<M>(count);
   }
 
   /**
-   * Creates a trigger that fires after the specified duration has passed since the first {@link MessageEnvelope} in
+   * Creates a trigger that fires after the specified duration has passed since the first message in
    * the pane.
    *
    * @param duration the duration since the first element
+   * @param <M> the type of input message in the window
    * @return the created trigger
    */
-  public static Trigger timeSinceFirstMessage(Duration duration) {
-    return new TimeSinceFirstMessageTrigger(duration);
+  public static <M> Trigger<M> timeSinceFirstMessage(Duration duration) {
+    return new TimeSinceFirstMessageTrigger<M>(duration);
   }
 
   /**
-   * Creates a trigger that fires when there is no new {@link MessageEnvelope} for the specified duration in the pane.
+   * Creates a trigger that fires when there is no new message for the specified duration in the pane.
    *
    * @param duration the duration since the last element
+   * @param <M> the type of input message in the window
    * @return the created trigger
    */
-  public static Trigger timeSinceLastMessage(Duration duration) {
-    return new TimeSinceLastMessageTrigger(duration);
+  public static <M> Trigger<M> timeSinceLastMessage(Duration duration) {
+    return new TimeSinceLastMessageTrigger<M>(duration);
   }
 
   /**
    * Creates a trigger that fires when any of the provided triggers fire.
    *
-   * @param <M> the type of input {@link MessageEnvelope} in the window
    * @param triggers the individual triggers
+   * @param <M> the type of input message in the window
    * @return the created trigger
    */
-  public static <M extends MessageEnvelope> Trigger any(Trigger<M>... triggers) {
-    List<Trigger> triggerList = new ArrayList<>();
+  public static <M> Trigger<M> any(Trigger<M>... triggers) {
+    List<Trigger<M>> triggerList = new ArrayList<>();
     for (Trigger trigger : triggers) {
       triggerList.add(trigger);
     }
-    return new AnyTrigger(Collections.unmodifiableList(triggerList));
+    return new AnyTrigger<M>(Collections.unmodifiableList(triggerList));
   }
 
   /**
@@ -98,11 +99,11 @@ public final class Triggers<M extends MessageEnvelope> {
    * <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is equivalent to creating an {@link AnyTrigger} from
    * its individual {@link RepeatingTrigger}s.
    *
-   * @param <M> the type of input {@link MessageEnvelope} in the window
    * @param trigger the individual trigger to repeat
+   * @param <M> the type of input message in the window
    * @return the created trigger
    */
-  public static <M extends MessageEnvelope> Trigger repeat(Trigger<M> trigger) {
+  public static <M> Trigger<M> repeat(Trigger<M> trigger) {
     return new RepeatingTrigger<>(trigger);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 6aae940..8aa665a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -19,18 +19,17 @@
 package org.apache.samza.operators.windows;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.triggers.Trigger;
 
 /**
- * Groups incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite
+ * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite
  * windows for processing.
  *
  * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
  * that determine when results from the {@link Window} are emitted.
  *
- * <p> Each emitted result contains one or more {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
- * A pane can include all {@link MessageEnvelope}s collected for the window so far or only the new {@link MessageEnvelope}s
+ * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}.
+ * A pane can include all messagess collected for the window so far or only the new messages
  * since the last emitted pane. (as determined by the {@link AccumulationMode})
  *
  * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
@@ -66,13 +65,12 @@ import org.apache.samza.operators.triggers.Trigger;
  * <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers}
  * APIs to create triggers.
  *
- * @param <M> the type of the input {@link MessageEnvelope}
- * @param <K> the type of the key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}.
+ * @param <M> the type of the input message
+ * @param <K> the type of the key in the message in this {@link org.apache.samza.operators.MessageStream}.
  * @param <WV> the type of the value in the {@link WindowPane}.
- * @param <WM> the type of the output.
  */
 @InterfaceStability.Unstable
-public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<K, WV>> {
+public interface Window<M, K, WV> {
 
   /**
    * Set the early triggers for this {@link Window}.
@@ -81,7 +79,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
    * @param trigger the early trigger
    * @return the {@link Window} function with the early trigger
    */
-  Window<M, K, WV, WM> setEarlyTrigger(Trigger<M> trigger);
+  Window<M, K, WV> setEarlyTrigger(Trigger<M> trigger);
 
   /**
    * Set the late triggers for this {@link Window}.
@@ -90,7 +88,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
    * @param trigger the late trigger
    * @return the {@link Window} function with the late trigger
    */
-  Window<M, K, WV, WM> setLateTrigger(Trigger<M> trigger);
+  Window<M, K, WV> setLateTrigger(Trigger<M> trigger);
 
   /**
    * Specify how a {@link Window} should process its previously emitted {@link WindowPane}s.
@@ -106,6 +104,6 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
    * @param mode the accumulation mode
    * @return the {@link Window} function with the specified {@link AccumulationMode}.
    */
-  Window<M, K, WV, WM> setAccumulationMode(AccumulationMode mode);
+  Window<M, K, WV> setAccumulationMode(AccumulationMode mode);
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index 7edf3e1..14bd5ab 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -21,7 +21,7 @@ package org.apache.samza.operators.windows;
 /**
  * Key for a {@link WindowPane} emitted from a {@link Window}.
  *
- * @param <K> the type of the key in the incoming {@link org.apache.samza.operators.data.MessageEnvelope}.
+ * @param <K> the type of the key in the incoming message.
  *            Windows that are not keyed have a {@link Void} key type.
  *
  */
@@ -29,18 +29,27 @@ public class WindowKey<K> {
 
   private final  K key;
 
-  private final String windowId;
+  private final String paneId;
 
   public WindowKey(K key, String  windowId) {
     this.key = key;
-    this.windowId = windowId;
+    this.paneId = windowId;
   }
 
   public K getKey() {
     return key;
   }
 
-  public String getWindowId() {
-    return windowId;
+  public String getPaneId() {
+    return paneId;
+  }
+
+  @Override
+  public String toString() {
+    String wndKey = "";
+    if (!(key instanceof Void)) {
+      wndKey = String.format("%s:", key.toString());
+    }
+    return String.format("%s%s", wndKey, paneId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
index 0388048..3b66bd1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
@@ -18,16 +18,13 @@
  */
 package org.apache.samza.operators.windows;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
 /**
  * Specifies the result emitted from a {@link Window}.
  *
  * @param <K>  the type of key in the window pane
  * @param <V>  the type of value in the window pane.
  */
-public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V> {
+public final class WindowPane<K, V> {
 
   private final WindowKey<K> key;
 
@@ -41,11 +38,11 @@ public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V>
     this.mode = mode;
   }
 
-  @Override public V getMessage() {
+  public V getMessage() {
     return this.value;
   }
 
-  @Override public WindowKey<K> getKey() {
+  public WindowKey<K> getKey() {
     return this.key;
   }