You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/10/22 04:41:24 UTC

samza git commit: SAMZA-915: implementation of operator classes

Repository: samza
Updated Branches:
  refs/heads/samza-sql fbdd76daa -> adcd26678


SAMZA-915: implementation of operator classes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adcd2667
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adcd2667
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adcd2667

Branch: refs/heads/samza-sql
Commit: adcd26678d3fb6ea632de7117bc40a9c4f343d59
Parents: fbdd76d
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Fri Oct 21 21:27:28 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Oct 21 21:31:47 2016 -0700

----------------------------------------------------------------------
 .../samza/operators/api/MessageStream.java      |  50 ++++--
 .../samza/operators/api/internal/Operators.java |   2 +-
 .../samza/operators/impl/ChainedOperators.java  |  52 +++++-
 .../samza/operators/impl/OperatorFactory.java   |  86 +++++++++
 .../samza/operators/impl/OperatorImpl.java      |  15 +-
 .../operators/impl/join/PartialJoinOpImpl.java  |  44 +++++
 .../impl/window/SessionWindowImpl.java          |  17 +-
 .../samza/operators/api/TestMessageStream.java  | 180 +++++++++++++++++++
 .../samza/operators/api/TestOutputMessage.java  |  47 +++++
 .../operators/api/internal/TestOperators.java   |   1 -
 .../operators/impl/TestChainedOperators.java    | 129 +++++++++++++
 .../operators/impl/TestOperatorFactory.java     |  95 ++++++++++
 .../samza/operators/impl/TestOperatorImpl.java  |   1 +
 .../samza/operators/impl/TestOutputMessage.java |  47 -----
 .../operators/impl/TestSimpleOperatorImpl.java  |   1 +
 .../operators/impl/TestSinkOperatorImpl.java    |   1 +
 .../impl/data/serializers/SqlAvroSerdeTest.java |   1 +
 .../impl/window/TestSessionWindowImpl.java      |  53 ++----
 .../samza/task/BroadcastOperatorTask.java       |  41 +++--
 .../samza/task/InputJsonSystemMessage.java      |   2 +-
 20 files changed, 729 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
index a01cee9..b5e1028 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
@@ -19,16 +19,20 @@
 
 package org.apache.samza.operators.api;
 
+import org.apache.samza.operators.api.Windows.Window;
 import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.WindowOutput;
 import org.apache.samza.operators.api.internal.Operators;
-import org.apache.samza.operators.api.Windows.Window;
+import org.apache.samza.operators.api.internal.Operators.Operator;
+import org.apache.samza.operators.api.internal.WindowOutput;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
@@ -41,6 +45,19 @@ import java.util.function.Function;
  */
 public class MessageStream<M extends Message> {
 
+  private final Set<Operator> subscribers = new HashSet<>();
+
+  /**
+   * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}.
+   *
+   * NOTE: This should only be used by implementation of {@link org.apache.samza.operators.impl.ChainedOperators}, not directly by programmers.
+   *
+   * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object
+   */
+  public Collection<Operator> getSubscribers() {
+    return Collections.unmodifiableSet(this.subscribers);
+  }
+
   /**
    * Public API methods start here
    */
@@ -65,12 +82,14 @@ public class MessageStream<M extends Message> {
    * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
    */
   public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
-    return Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
+    Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
       OM r = mapper.apply(m);
       if (r != null) {
         this.add(r);
       }
-    }}).getOutputStream();
+    }});
+    this.subscribers.add(op);
+    return op.getOutputStream();
   }
 
   /**
@@ -81,7 +100,9 @@ public class MessageStream<M extends Message> {
    * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
    */
   public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
-    return Operators.getStreamOperator(flatMapper).getOutputStream();
+    Operator<OM> op = Operators.getStreamOperator(flatMapper);
+    this.subscribers.add(op);
+    return op.getOutputStream();
   }
 
   /**
@@ -91,11 +112,13 @@ public class MessageStream<M extends Message> {
    * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
    */
   public MessageStream<M> filter(Function<M, Boolean> filter) {
-    return Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
+    Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
       if (filter.apply(t)) {
         this.add(t);
       }
-    }}).getOutputStream();
+    }});
+    this.subscribers.add(op);
+    return op.getOutputStream();
   }
 
   /**
@@ -105,7 +128,7 @@ public class MessageStream<M extends Message> {
    * @param sink  the user-defined sink function to send the input {@link Message}s to the external output systems
    */
   public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
-    Operators.getSinkOperator(sink);
+    this.subscribers.add(Operators.getSinkOperator(sink));
   }
 
   /**
@@ -119,7 +142,9 @@ public class MessageStream<M extends Message> {
    * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
    */
   public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) {
-    return Operators.getWindowOperator(Windows.getInternalWindowFn(window)).getOutputStream();
+    Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window));
+    this.subscribers.add(wndOp);
+    return wndOp.getOutputStream();
   }
 
   /**
@@ -141,8 +166,8 @@ public class MessageStream<M extends Message> {
 
     // TODO: need to add default store functions for the two partial join functions
 
-    Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream);
-    Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream);
+    other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream));
+    this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream));
     return outputStream;
   }
 
@@ -156,7 +181,8 @@ public class MessageStream<M extends Message> {
     MessageStream<M> outputStream = new MessageStream<>();
 
     others.add(this);
-    others.forEach(other -> Operators.getMergeOperator(outputStream));
+    others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream)));
     return outputStream;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
index f220285..e9bfe0b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
@@ -55,7 +55,7 @@ public class Operators {
    * Private interface for stream operator functions. The interface class defines the output of the stream operator function.
    *
    */
-  private interface Operator<OM extends Message> {
+  public interface Operator<OM extends Message> {
     MessageStream<OM> getOutputStream();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
index 49cfdeb..59de16b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
@@ -21,10 +21,16 @@ package org.apache.samza.operators.impl;
 
 import org.apache.samza.operators.api.MessageStream;
 import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.Operator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
 
 /**
  * Implementation class for a chain of operators from the single input {@code source}
@@ -33,6 +39,8 @@ import org.apache.samza.task.TaskCoordinator;
  */
 public class ChainedOperators<M extends Message> {
 
+  private final Set<OperatorImpl> subscribers = new HashSet<>();
+
   /**
    * Private constructor
    *
@@ -41,7 +49,38 @@ public class ChainedOperators<M extends Message> {
    */
   private ChainedOperators(MessageStream<M> source, TaskContext context) {
     // create the pipeline/topology starting from source
-    // pass in the context s.t. stateful stream operators can initialize their stores
+    source.getSubscribers().forEach(sub -> {
+      // pass in the context s.t. stateful stream operators can initialize their stores
+      OperatorImpl subImpl = this.createAndSubscribe(sub, source, context);
+      this.subscribers.add(subImpl);
+    });
+  }
+
+  /**
+   * Private function to recursively instantiate the implementation of operators and the chains
+   *
+   * @param operator  the operator that subscribe to {@code source}
+   * @param source  the source {@link MessageStream}
+   * @param context  the context of the task
+   * @return  the implementation object of the corresponding {@code operator}
+   */
+  private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source,
+      TaskContext context) {
+    Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator);
+    if (factoryEntry.getValue()) {
+      // The operator has already been instantiated and we do not need to traverse and create the subscribers any more.
+      return factoryEntry.getKey();
+    }
+    OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey();
+    MessageStream outStream = operator.getOutputStream();
+    Collection<Operator> subs = outStream.getSubscribers();
+    subs.forEach(sub -> {
+      OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context);
+      opImpl.subscribe(subImpl);
+    });
+    // initialize the operator's state store
+    opImpl.init(source, context);
+    return opImpl;
   }
 
   /**
@@ -64,10 +103,17 @@ public class ChainedOperators<M extends Message> {
    * @param coordinator  the {@link TaskCoordinator} object within the process context
    */
   public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
-    // TODO: add implementation of onNext() that actually triggers the process pipeline
+    this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator));
   }
 
+  /**
+   * Method to handle timer events
+   *
+   * @param collector  the {@link MessageCollector} object within the process context
+   * @param coordinator  the {@link TaskCoordinator} object within the process context
+   */
   public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    // TODO: add implementation of onTimer() that actually calls the corresponding window operator's onTimer() methods
+    long nanoTime = System.nanoTime();
+    this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
new file mode 100644
index 0000000..f16cbc6
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.commons.collections.keyvalue.AbstractMapEntry;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.*;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
+import org.apache.samza.operators.impl.window.SessionWindowImpl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map.Entry;
+
+
+/**
+ * The factory class that instantiates all implementation of {@link OperatorImpl} classes.
+ */
+public class OperatorFactory {
+
+  /**
+   * the static operatorMap that includes all operator implementation instances
+   */
+  private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> operatorMap = new ConcurrentHashMap<>();
+
+  /**
+   * The method to actually create the implementation instances of operators
+   *
+   * @param operator  the immutable definition of {@link Operator}
+   * @param <M>  type of input {@link Message}
+   * @param <RM>  type of output {@link Message}
+   * @return  the implementation object of {@link OperatorImpl}
+   */
+  private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) {
+    if (operator instanceof StreamOperator) {
+      return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator);
+    } else if (operator instanceof SinkOperator) {
+      return new SinkOperatorImpl<>((SinkOperator<M>) operator);
+    } else if (operator instanceof WindowOperator) {
+      return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator);
+    } else if (operator instanceof PartialJoinOperator) {
+      return new PartialJoinOpImpl<>((PartialJoinOperator) operator);
+    }
+    throw new IllegalArgumentException(
+        String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName()));
+  }
+
+  /**
+   * The method to get the unique implementation instance of {@link Operator}
+   *
+   * @param operator  the {@link Operator} to instantiate
+   * @param <M>  type of input {@link Message}
+   * @param <RM>  type of output {@link Message}
+   * @return  A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether
+   *          the operator instance has already been created or not. True means the operator instance has already created, false means the operator
+   *          was not created.
+   */
+  public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) {
+    if (!operatorMap.containsKey(operator)) {
+      OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator);
+      if( operatorMap.putIfAbsent(operator, operatorImpl) == null ) {
+        return new AbstractMapEntry(operatorImpl, false) {};
+      }
+    }
+    return new AbstractMapEntry((OperatorImpl<M, ? extends Message>) operatorMap.get(operator), true) {};
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 81a7ede..3ca8bde 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.MessageStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -61,6 +62,17 @@ public abstract class OperatorImpl<M extends Message, RM extends Message>
   }
 
   /**
+   * Default method for timer event
+   *
+   * @param nanoTime  the system nano-second when the timer event is triggered
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  public void onTimer(long nanoTime, MessageCollector collector, TaskCoordinator coordinator) {
+    this.subscribers.forEach(sub -> ((OperatorImpl)sub).onTimer(nanoTime, collector, coordinator));
+  }
+
+  /**
    * Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers.
    *
    * @param message  the input {@link Message}
@@ -72,9 +84,10 @@ public abstract class OperatorImpl<M extends Message, RM extends Message>
   /**
    * Stateful operators will need to override this method to initialize the operators
    *
+   * @param source  the source that this {@link OperatorImpl} object subscribe to
    * @param context  the task context to initialize the operators within
    */
-  protected void init(TaskContext context) {};
+  protected void init(MessageStream<M> source, TaskContext context) {};
 
   /**
    * Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream}

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
new file mode 100644
index 0000000..bbe08a4
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.join;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a {@link org.apache.samza.operators.api.internal.Operators.PartialJoinOperator}. This class implements function
+ * that only takes in one input stream among all inputs to the join and generate the join output.
+ *
+ * @param <M>  Type of input stream {@link org.apache.samza.operators.api.data.Message}
+ * @param <RM>  Type of join output stream {@link org.apache.samza.operators.api.data.Message}
+ */
+public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> {
+
+  public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) {
+    // TODO: implement PartialJoinOpImpl constructor
+  }
+
+  @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    // TODO: implement PartialJoinOpImpl processing logic
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
index 2de53aa..59e2dec 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
@@ -30,29 +30,27 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
-import java.util.function.BiFunction;
-
 
 /**
  * Default implementation class of a {@link WindowOperator} for a session window.
  *
  * @param <M>  the type of input {@link Message}
  * @param <RK>  the type of window key
+ * @param <WS>  the type of window state
  * @param <RM>  the type of aggregated value of the window
  */
 public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends
     OperatorImpl<M, RM> {
-  private final BiFunction<M, Entry<RK, WS>, RM> txfmFunction;
-  private final StateStoreImpl<M, RK, WS> wndStore;
+  private final WindowOperator<M, RK, WS, RM> sessWnd;
+  private StateStoreImpl<M, RK, WS> wndStore = null;
 
-  SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd, MessageStream<M> input) {
-    this.txfmFunction = sessWnd.getFunction();
-    this.wndStore = new StateStoreImpl<>(sessWnd.getStoreFunctions(), sessWnd.getStoreName(input));
+  public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) {
+    this.sessWnd = sessWnd;
   }
 
   @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
     Entry<RK, WS> state = this.wndStore.getState(message);
-    this.nextProcessors(this.txfmFunction.apply(message, state), collector, coordinator);
+    this.nextProcessors(this.sessWnd.getFunction().apply(message, state), collector, coordinator);
     this.wndStore.updateState(message, state);
   }
 
@@ -60,7 +58,8 @@ public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM
     // This is to periodically check the timeout triggers to get the list of window states to be updated
   }
 
-  @Override protected void init(TaskContext context) {
+  @Override protected void init(MessageStream<M> source, TaskContext context) {
+    this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), sessWnd.getStoreName(source));
     this.wndStore.init(context);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
new file mode 100644
index 0000000..9f9ad6b
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.internal.Operators.*;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStream {
+
+  @Test public void testMap() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2);
+    MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestOutputMessage> mapOp = subs.iterator().next();
+    assertTrue(mapOp instanceof StreamOperator);
+    assertEquals(mapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    TestMessage xTestMsg = mock(TestMessage.class);
+    when(xTestMsg.getKey()).thenReturn("test-msg-key");
+    when(xTestMsg.getMessage()).thenReturn("123456789");
+    when(xTestMsg.getTimestamp()).thenReturn(12345L);
+    Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
+    assertEquals(cOutputMsg.size(), 1);
+    TestOutputMessage outputMessage = cOutputMsg.iterator().next();
+    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1));
+    assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
+  }
+
+  @Test public void testFlatMap() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() {{
+      this.add(mock(TestOutputMessage.class));
+      this.add(mock(TestOutputMessage.class));
+      this.add(mock(TestOutputMessage.class));
+    }};
+    Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts;
+    MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
+    assertTrue(flatMapOp instanceof StreamOperator);
+    assertEquals(flatMapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap);
+  }
+
+  @Test public void testFilter() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
+    MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> filterOp = subs.iterator().next();
+    assertTrue(filterOp instanceof StreamOperator);
+    assertEquals(filterOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
+    TestMessage mockMsg = mock(TestMessage.class);
+    when(mockMsg.getTimestamp()).thenReturn(11111L);
+    Collection<TestMessage> output = txfmFn.apply(mockMsg);
+    assertTrue(output.isEmpty());
+    when(mockMsg.getTimestamp()).thenReturn(999999L);
+    output = txfmFn.apply(mockMsg);
+    assertEquals(output.size(), 1);
+    assertEquals(output.iterator().next(), mockMsg);
+  }
+
+  @Test public void testSink() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> {
+      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    };
+    inputStream.sink(xSink);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> sinkOp = subs.iterator().next();
+    assertTrue(sinkOp instanceof SinkOperator);
+    assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
+    assertNull(((SinkOperator) sinkOp).getOutputStream());
+  }
+
+  @Test public void testWindow() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class);
+    MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> wndOp = subs.iterator().next();
+    assertTrue(wndOp instanceof WindowOperator);
+    assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
+  }
+
+  @Test public void testJoin() {
+    MessageStream<TestMessage> source1 = new MessageStream<>();
+    MessageStream<TestMessage> source2 = new MessageStream<>();
+    BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp());
+    MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner);
+    Collection<Operator> subs = source1.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> joinOp1 = subs.iterator().next();
+    assertTrue(joinOp1 instanceof PartialJoinOperator);
+    assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput);
+    subs = source2.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> joinOp2 = subs.iterator().next();
+    assertTrue(joinOp2 instanceof PartialJoinOperator);
+    assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput);
+    TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L);
+    TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L);
+    TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    assertEquals(xOut.getTimestamp(), 11111L);
+    xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    assertEquals(xOut.getTimestamp(), 11111L);
+  }
+
+  @Test public void testMerge() {
+    MessageStream<TestMessage> merge1 = new MessageStream<>();
+    Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>(){{
+      this.add(new MessageStream<>());
+      this.add(new MessageStream<>());
+    }};
+    MessageStream<TestMessage> mergeOutput = merge1.merge(others);
+    validateMergeOperator(merge1, mergeOutput);
+
+    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  }
+
+  private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) {
+    Collection<Operator> subs = mergeSource.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> mergeOp = subs.iterator().next();
+    assertTrue(mergeOp instanceof StreamOperator);
+    assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
+    TestMessage mockMsg = mock(TestMessage.class);
+    Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg);
+    assertEquals(outputs.size(), 1);
+    assertEquals(outputs.iterator().next(), mockMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
new file mode 100644
index 0000000..225e77f
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.Message;
+
+
+public class TestOutputMessage implements Message<String, Integer> {
+  private final String key;
+  private final Integer value;
+  private final long timestamp;
+
+  public TestOutputMessage(String key, Integer value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public Integer getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
index 65c37e9..6dc77e5 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
@@ -117,7 +117,6 @@ public class TestOperators {
 
   @Test public void testGetMergeOperator() {
     MessageStream<TestMessage> output = new MessageStream<>();
-
     Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
     Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() {{
       this.add(t);

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
new file mode 100644
index 0000000..d4d2378
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
+import org.apache.samza.operators.api.Windows;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestChainedOperators {
+  Field subsField = null;
+  Field opSubsField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    subsField = ChainedOperators.class.getDeclaredField("subscribers");
+    subsField.setAccessible(true);
+    opSubsField = OperatorImpl.class.getDeclaredField("subscribers");
+    opSubsField.setAccessible(true);
+  }
+
+  @Test public void testCreate() {
+    // test creation of empty chain
+    MessageStream<TestMessage> testStream = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testStream, mockContext);
+    assertTrue(operatorChain != null);
+  }
+
+  @Test public void testLinearChain() throws IllegalAccessException {
+    // test creation of linear chain
+    MessageStream<TestMessage> testInput = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessage::getKey));
+    ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessage, TestMessage> firstOpImpl = subsSet.iterator().next();
+    Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(firstOpImpl);
+    assertEquals(subsOps.size(), 1);
+    Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = subsOps.iterator().next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(wndOpImpl);
+    assertEquals(subsOps.size(), 0);
+  }
+
+  @Test public void testBroadcastChain() throws IllegalAccessException {
+    // test creation of broadcast chain
+    MessageStream<TestMessage> testInput = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new ArrayList() {{ this.add(m); this.add(m); }});
+    testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m);
+    ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
+    assertEquals(subsSet.size(), 2);
+    Iterator<OperatorImpl> iter = subsSet.iterator();
+    // check the first branch w/ flatMap
+    OperatorImpl<TestMessage, TestMessage> opImpl = iter.next();
+    Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
+    assertEquals(subsOps.size(), 1);
+    Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = subsOps.iterator().next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(flatMapImpl);
+    assertEquals(subsOps.size(), 0);
+    // check the second branch w/ map
+    opImpl = iter.next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
+    assertEquals(subsOps.size(), 1);
+    Subscriber<? super ProcessorContext<TestMessage>> mapImpl = subsOps.iterator().next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(mapImpl);
+    assertEquals(subsOps.size(), 0);
+  }
+
+  @Test public void testJoinChain() throws IllegalAccessException {
+    // test creation of join chain
+    MessageStream<TestMessage> input1 = new MessageStream<>();
+    MessageStream<TestMessage> input2 = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m -> m);
+    // now, we create chained operators from each input sources
+    ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, mockContext);
+    ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, mockContext);
+    // check that those two chains will merge at map operator
+    // first branch of the join
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = subsSet.iterator().next();
+    Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp1);
+    assertEquals(subsOps.size(), 1);
+    // the map operator consumes the common join output, where two branches merge
+    Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = subsOps.iterator().next();
+    // second branch of the join
+    subsSet = (Set<OperatorImpl>) subsField.get(chain2);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = subsSet.iterator().next();
+    assertNotSame(joinOp1, joinOp2);
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp2);
+    assertEquals(subsOps.size(), 1);
+    // make sure that the map operator is the same
+    assertEquals(mapImpl, subsOps.iterator().next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
new file mode 100644
index 0000000..d228784
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.api.internal.Operators.SinkOperator;
+import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.operators.api.internal.Operators.WindowOperator;
+import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
+import org.apache.samza.operators.impl.window.SessionWindowImpl;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorFactory {
+
+  @Test public void testGetOperator() throws NoSuchFieldException, IllegalAccessException {
+    // get window operator
+    WindowOperator mockWnd = mock(WindowOperator.class);
+    Map.Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean>
+        factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockWnd);
+    assertFalse(factoryEntry.getValue());
+    OperatorImpl<TestMessage, TestOutputMessage> opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+    assertTrue(opImpl instanceof SessionWindowImpl);
+    Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
+    sessWndField.setAccessible(true);
+    WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl);
+    assertEquals(sessWnd, mockWnd);
+
+    // get simple operator
+    StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = mock(StreamOperator.class);
+    Function<TestMessage, Collection<TestOutputMessage>>  mockTxfmFn = mock(Function.class);
+    when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn);
+    factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockSimpleOp);
+    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+    assertTrue(opImpl instanceof SimpleOperatorImpl);
+    Field txfmFnField = SimpleOperatorImpl.class.getDeclaredField("transformFn");
+    txfmFnField.setAccessible(true);
+    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
+
+    // get sink operator
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, mc, tc) -> {};
+    SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class);
+    when(sinkOp.getFunction()).thenReturn(sinkFn);
+    factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(sinkOp);
+    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+    assertTrue(opImpl instanceof SinkOperatorImpl);
+    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc");
+    sinkFnField.setAccessible(true);
+    assertEquals(sinkFn, sinkFnField.get(opImpl));
+
+    // get join operator
+    PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> joinOp = mock(PartialJoinOperator.class);
+    TestOutputMessage mockOutput = mock(TestOutputMessage.class);
+    BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) -> mockOutput;
+    when(joinOp.getFunction()).thenReturn(joinFn);
+    factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(joinOp);
+    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+    assertTrue(opImpl instanceof PartialJoinOpImpl);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 9445f3a..d296111 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
deleted file mode 100644
index 4bcf767..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-class TestOutputMessage implements Message<String, Integer> {
-  private final String key;
-  private final Integer value;
-  private final long timestamp;
-
-  TestOutputMessage(String key, Integer value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
index 50154f0..c8c4944 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
 import org.apache.samza.operators.api.internal.Operators.StreamOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index eb8a23a..e711bc5 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestOutputMessage;
 import org.apache.samza.operators.api.internal.Operators.SinkOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
index 10ee2c7..5aa28bb 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
@@ -64,6 +64,7 @@ public class SqlAvroSerdeTest {
   @Test
   public void testSqlAvroSerialization() throws IOException {
     AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+    @SuppressWarnings("unchecked")
     byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
 
     AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
index 6947464..75cb00c 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
@@ -24,7 +24,6 @@ import org.apache.samza.operators.api.WindowState;
 import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
 import org.apache.samza.operators.api.internal.Operators.WindowOperator;
 import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.operators.impl.StateStoreImpl;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
@@ -44,67 +43,41 @@ import static org.mockito.Mockito.*;
 
 public class TestSessionWindowImpl {
   Field wndStoreField = null;
-  Field txfmFnField = null;
+  Field sessWndField = null;
 
   @Before public void prep() throws NoSuchFieldException {
     wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore");
-    txfmFnField = SessionWindowImpl.class.getDeclaredField("txfmFunction");
+    sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
     wndStoreField.setAccessible(true);
-    txfmFnField.setAccessible(true);
+    sessWndField.setAccessible(true);
   }
 
   @Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException {
     // test constructing a SessionWindowImpl w/ expected mock functions
-    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
     WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
-    StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
-    when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
-    when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
-    when(wndOp.getFunction()).thenReturn(mockTxfmFn);
-    SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm);
-    BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>> txfmFn =
-        (BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>>) txfmFnField.get(sessWnd);
-    assertEquals(mockTxfmFn, txfmFn);
-    StateStoreImpl<TestMessage, String, WindowState<Integer>> storeImpl =
-        (StateStoreImpl<TestMessage, String, WindowState<Integer>>) wndStoreField.get(sessWnd);
-
-    // test init() and make sure the wndStore is initialized as expected
-    TestMessage mockMsg = mock(TestMessage.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
-    when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
-    Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
-    when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
-    WindowState<Integer> mockState = mock(WindowState.class);
-    when(mockKvStore.get("test-msg-key")).thenReturn(mockState);
-    storeImpl.init(mockContext);
-    Entry<String, WindowState<Integer>> stateEntry = storeImpl.getState(mockMsg);
-    verify(mockStoreFns, times(1)).getStoreKeyFinder();
-    verify(mockKvStore, times(1)).get("test-msg-key");
-    assertEquals(stateEntry.getKey(), "test-msg-key");
-    assertEquals(stateEntry.getValue(), mockState);
+    SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
+    assertEquals(wndOp, sessWndField.get(sessWnd));
   }
 
-  @Test public void testInitAndProcess() {
-    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
+  @Test public void testInitAndProcess() throws IllegalAccessException {
     WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+    SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
+
+    // construct and init the SessionWindowImpl object
+    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
     StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
     Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
     when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
     when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
     when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
     when(wndOp.getFunction()).thenReturn(mockTxfmFn);
-
-    // construct and init the SessionWindowImpl object
-    SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm);
     TaskContext mockContext = mock(TaskContext.class);
     KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
     when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
-    sessWnd.init(mockContext);
+    sessWnd.init(mockInputStrm, mockContext);
 
-    // test onNext() method. Make sure the right methods are invoked.
+    // test onNext() method. Make sure the transformation function and the state update functions are invoked.
     TestMessage mockMsg = mock(TestMessage.class);
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 91b0074..724bbba 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -56,27 +56,26 @@ public class BroadcastOperatorTask implements StreamOperatorTask {
 
   @Override public void initOperators(Collection<SystemMessageStream> sources) {
     sources.forEach(source -> {
-          MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage);
-
-          inputStream.filter(this::myFilter1).
-              window(Windows.<JsonMessage, String>intoSessionCounter(
-                  m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
-                  setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100).
-                      addLateTriggerOnSizeLimit(10).
-                      addTimeoutSinceLastMessage(30000)));
-
-          inputStream.filter(this::myFilter2).
-              window(Windows.<JsonMessage, String>intoSessions(
-                  m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)).
-                  setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
-                      addTimeoutSinceLastMessage(30000)));
-
-          inputStream.filter(this::myFilter3).
-              window(Windows.<JsonMessage, String, MessageType>intoSessions(
-                  m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()).
-                  setTriggers(TriggerBuilder
-                      .<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
-                          addTimeoutSinceFirstMessage(60000)));
+      MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage);
+
+      inputStream.filter(this::myFilter1).
+        window(Windows.<JsonMessage, String>intoSessionCounter(
+            m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
+          setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100).
+            addLateTriggerOnSizeLimit(10).
+            addTimeoutSinceLastMessage(30000)));
+
+      inputStream.filter(this::myFilter2).
+        window(Windows.<JsonMessage, String>intoSessions(
+            m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)).
+          setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
+            addTimeoutSinceLastMessage(30000)));
+
+      inputStream.filter(this::myFilter3).
+        window(Windows.<JsonMessage, String, MessageType>intoSessions(
+            m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()).
+          setTriggers(TriggerBuilder.<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
+            addTimeoutSinceFirstMessage(60000)));
     }
     );
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
index 5e710b2..33ae9c9 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
@@ -25,7 +25,7 @@ import org.apache.samza.system.SystemStreamPartition;
 
 
 /**
- * Example input message w/ avro message body and string as the key.
+ * Example input message w/ Json message body and string as the key.
  */
 
 public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> {