You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/01 22:51:02 UTC

[2/6] samza git commit: SAMZA-1054: Refactor Operator APIs

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
new file mode 100644
index 0000000..2ad6461
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.SessionWindow;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStreamImpl {
+
+  @Test
+  public void testMap() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap =
+        m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
+    assertTrue(mapOp instanceof StreamOperatorSpec);
+    assertEquals(mapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
+    TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(xTestMsg.getKey()).thenReturn("test-msg-key");
+    when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
+    when(mockInnerTestMessage.getValue()).thenReturn("123456789");
+
+    Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
+    assertEquals(cOutputMsg.size(), 1);
+    TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
+    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+  }
+
+  @Test
+  public void testFlatMap() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+    Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
+        this.add(mock(TestOutputMessageEnvelope.class));
+        this.add(mock(TestOutputMessageEnvelope.class));
+        this.add(mock(TestOutputMessageEnvelope.class));
+      } };
+    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = m -> flatOuts;
+    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
+    assertTrue(flatMapOp instanceof StreamOperatorSpec);
+    assertEquals(flatMapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
+  }
+
+  @Test
+  public void testFilter() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+    FilterFunction<TestMessageEnvelope> xFilter = m -> m.getMessage().getEventTime() > 123456L;
+    MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
+    assertTrue(filterOp instanceof StreamOperatorSpec);
+    assertEquals(filterOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+    when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
+    Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
+    assertTrue(output.isEmpty());
+    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+    when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
+    output = txfmFn.apply(mockMsg);
+    assertEquals(output.size(), 1);
+    assertEquals(output.iterator().next(), mockMsg);
+  }
+
+  @Test
+  public void testSink() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+    SinkFunction<TestMessageEnvelope> xSink = (m, mc, tc) -> {
+      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    };
+    inputStream.sink(xSink);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
+    assertTrue(sinkOp instanceof SinkOperatorSpec);
+    assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
+    assertNull(((SinkOperatorSpec) sinkOp).getOutputStream());
+  }
+
+  @Test
+  public void testWindow() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+    SessionWindow<TestMessageEnvelope, String, Integer> window = mock(SessionWindow.class);
+    doReturn(mock(WindowFn.class)).when(window).getInternalWindowFn();
+    MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> wndOp = subs.iterator().next();
+    assertTrue(wndOp instanceof WindowOperatorSpec);
+    assertEquals(((WindowOperatorSpec) wndOp).getOutputStream(), outStream);
+  }
+
+  @Test
+  public void testJoin() {
+    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>();
+    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>();
+    JoinFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+        (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+    MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
+    Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
+    assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
+    assertEquals(((PartialJoinOperatorSpec) joinOp1).getOutputStream(), joinOutput);
+    subs = source2.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
+    assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
+    assertEquals(((PartialJoinOperatorSpec) joinOp2).getOutputStream(), joinOutput);
+    TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
+    TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
+    TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+  }
+
+  @Test
+  public void testMerge() {
+    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>();
+    Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
+        this.add(new MessageStreamImpl<>());
+        this.add(new MessageStreamImpl<>());
+      } };
+    MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
+    validateMergeOperator(merge1, mergeOutput);
+
+    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  }
+
+  private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
+    Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
+    assertTrue(mergeOp instanceof StreamOperatorSpec);
+    assertEquals(((StreamOperatorSpec) mergeOp).getOutputStream(), mergeOutput);
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
+    assertEquals(outputs.size(), 1);
+    assertEquals(outputs.iterator().next(), mockMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
new file mode 100644
index 0000000..8fa7ccc
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestStateStoreImpl {
+  @Test
+  public void testStateStoreImpl() {
+    StoreFunctions<TestMessageEnvelope, String, WindowState> mockStoreFunctions = mock(StoreFunctions.class);
+    // test constructor
+    StateStoreImpl<TestMessageEnvelope, String, WindowState> storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName");
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class);
+    when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
+    // test init()
+    storeImpl.init(mockContext);
+    verify(mockContext, times(1)).getStore("myStoreName");
+    Function<TestMessageEnvelope, String> wndKeyFn = mock(Function.class);
+    when(mockStoreFunctions.getStoreKeyFn()).thenReturn(wndKeyFn);
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    when(wndKeyFn.apply(mockMsg)).thenReturn("myKey");
+    WindowState mockState = mock(WindowState.class);
+    when(mockKvStore.get("myKey")).thenReturn(mockState);
+    // test getState()
+    Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg);
+    assertEquals(storeEntry.getKey(), "myKey");
+    assertEquals(storeEntry.getValue(), mockState);
+    verify(wndKeyFn, times(1)).apply(mockMsg);
+    verify(mockKvStore, times(1)).get("myKey");
+    Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState);
+    WindowState mockNewState = mock(WindowState.class);
+    BiFunction<TestMessageEnvelope, WindowState, WindowState> mockUpdaterFn = mock(BiFunction.class);
+    when(mockStoreFunctions.getStateUpdaterFn()).thenReturn(mockUpdaterFn);
+    when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState);
+    // test updateState()
+    Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, oldEntry);
+    assertEquals(newEntry.getKey(), "myKey");
+    assertEquals(newEntry.getValue(), mockNewState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
new file mode 100644
index 0000000..f33510e
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.TriggerBuilder;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class WindowTask implements StreamOperatorTask {
+  class MessageType {
+    String field1;
+    String field2;
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
+    messageStreams.values().forEach(source ->
+      source.map(m1 ->
+        new JsonMessageEnvelope(
+          this.myMessageKeyFunction(m1),
+          (MessageType) m1.getMessage(),
+          m1.getOffset(),
+          m1.getSystemStreamPartition())).
+        window(
+          Windows.<JsonMessageEnvelope, String>intoSessionCounter(
+              m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
+            setTriggers(TriggerBuilder.<JsonMessageEnvelope, Integer>earlyTriggerWhenExceedWndLen(100).
+              addTimeoutSinceLastMessage(30000)))
+    );
+  }
+
+  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+    return m.getKey().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
new file mode 100644
index 0000000..9a425d1
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Example input {@link MessageEnvelope} w/ Json message and string as the key.
+ */
+
+public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
+
+  private final String key;
+  private final T data;
+  private final Offset offset;
+  private final SystemStreamPartition partition;
+
+  public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
+    this.key = key;
+    this.data = data;
+    this.offset = offset;
+    this.partition = partition;
+  }
+
+  @Override
+  public T getMessage() {
+    return this.data;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+
+  public Offset getOffset() {
+    return this.offset;
+  }
+
+  public SystemStreamPartition getSystemStreamPartition() {
+    return this.partition;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
deleted file mode 100644
index e3a70e8..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.Windows;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.reactivestreams.Subscriber;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-
-public class TestChainedOperators {
-  Field subsField = null;
-  Field opSubsField = null;
-
-  @Before public void prep() throws NoSuchFieldException {
-    subsField = ChainedOperators.class.getDeclaredField("subscribers");
-    subsField.setAccessible(true);
-    opSubsField = OperatorImpl.class.getDeclaredField("subscribers");
-    opSubsField.setAccessible(true);
-  }
-
-  @Test public void testCreate() {
-    // test creation of empty chain
-    MessageStream<TestMessage> testStream = new MessageStream<>();
-    TaskContext mockContext = mock(TaskContext.class);
-    ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testStream, mockContext);
-    assertTrue(operatorChain != null);
-  }
-
-  @Test public void testLinearChain() throws IllegalAccessException {
-    // test creation of linear chain
-    MessageStream<TestMessage> testInput = new MessageStream<>();
-    TaskContext mockContext = mock(TaskContext.class);
-    testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessage::getKey));
-    ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessage, TestMessage> firstOpImpl = subsSet.iterator().next();
-    Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(firstOpImpl);
-    assertEquals(subsOps.size(), 1);
-    Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = subsOps.iterator().next();
-    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(wndOpImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test public void testBroadcastChain() throws IllegalAccessException {
-    // test creation of broadcast chain
-    MessageStream<TestMessage> testInput = new MessageStream<>();
-    TaskContext mockContext = mock(TaskContext.class);
-    testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
-    testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m);
-    ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
-    assertEquals(subsSet.size(), 2);
-    Iterator<OperatorImpl> iter = subsSet.iterator();
-    // check the first branch w/ flatMap
-    OperatorImpl<TestMessage, TestMessage> opImpl = iter.next();
-    Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = subsOps.iterator().next();
-    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(flatMapImpl);
-    assertEquals(subsOps.size(), 0);
-    // check the second branch w/ map
-    opImpl = iter.next();
-    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    Subscriber<? super ProcessorContext<TestMessage>> mapImpl = subsOps.iterator().next();
-    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(mapImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test public void testJoinChain() throws IllegalAccessException {
-    // test creation of join chain
-    MessageStream<TestMessage> input1 = new MessageStream<>();
-    MessageStream<TestMessage> input2 = new MessageStream<>();
-    TaskContext mockContext = mock(TaskContext.class);
-    input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m -> m);
-    // now, we create chained operators from each input sources
-    ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, mockContext);
-    ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, mockContext);
-    // check that those two chains will merge at map operator
-    // first branch of the join
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = subsSet.iterator().next();
-    Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp1);
-    assertEquals(subsOps.size(), 1);
-    // the map operator consumes the common join output, where two branches merge
-    Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = subsOps.iterator().next();
-    // second branch of the join
-    subsSet = (Set<OperatorImpl>) subsField.get(chain2);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = subsSet.iterator().next();
-    assertNotSame(joinOp1, joinOp2);
-    subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp2);
-    assertEquals(subsOps.size(), 1);
-    // make sure that the map operator is the same
-    assertEquals(mapImpl, subsOps.iterator().next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
deleted file mode 100644
index cb4576c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
-import org.apache.samza.operators.impl.window.SessionWindowImpl;
-import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
-import org.apache.samza.operators.internal.Operators.SinkOperator;
-import org.apache.samza.operators.internal.Operators.StreamOperator;
-import org.apache.samza.operators.internal.Operators.WindowOperator;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorFactory {
-
-  @Test public void testGetOperator() throws NoSuchFieldException, IllegalAccessException {
-    // get window operator
-    WindowOperator mockWnd = mock(WindowOperator.class);
-    Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean>
-        factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockWnd);
-    assertFalse(factoryEntry.getValue());
-    OperatorImpl<TestMessage, TestOutputMessage> opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
-    assertTrue(opImpl instanceof SessionWindowImpl);
-    Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
-    sessWndField.setAccessible(true);
-    WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl);
-    assertEquals(sessWnd, mockWnd);
-
-    // get simple operator
-    StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = mock(StreamOperator.class);
-    Function<TestMessage, Collection<TestOutputMessage>>  mockTxfmFn = mock(Function.class);
-    when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn);
-    factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockSimpleOp);
-    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
-    assertTrue(opImpl instanceof SimpleOperatorImpl);
-    Field txfmFnField = SimpleOperatorImpl.class.getDeclaredField("transformFn");
-    txfmFnField.setAccessible(true);
-    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
-    // get sink operator
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, mc, tc) -> { };
-    SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class);
-    when(sinkOp.getFunction()).thenReturn(sinkFn);
-    factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(sinkOp);
-    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
-    assertTrue(opImpl instanceof SinkOperatorImpl);
-    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc");
-    sinkFnField.setAccessible(true);
-    assertEquals(sinkFn, sinkFnField.get(opImpl));
-
-    // get join operator
-    PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> joinOp = mock(PartialJoinOperator.class);
-    TestOutputMessage mockOutput = mock(TestOutputMessage.class);
-    BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) -> mockOutput;
-    when(joinOp.getFunction()).thenReturn(joinFn);
-    factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(joinOp);
-    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
-    assertTrue(opImpl instanceof PartialJoinOpImpl);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 4bd467d..361972e 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,52 +18,54 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
+import org.hamcrest.core.IsEqual;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.reactivestreams.Subscriber;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 
 public class TestOperatorImpl {
 
-  TestMessage curInputMsg;
+  TestMessageEnvelope curInputMsg;
   MessageCollector curCollector;
   TaskCoordinator curCoordinator;
 
-  @Test public void testSubscribers() {
+  @Test
+  public void testSubscribers() {
     this.curInputMsg = null;
     this.curCollector = null;
     this.curCoordinator = null;
-    OperatorImpl<TestMessage, TestOutputMessage> opImpl = new OperatorImpl<TestMessage, TestOutputMessage>() {
-      @Override protected void onNext(TestMessage message, MessageCollector collector, TaskCoordinator coordinator) {
+    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
+      @Override
+      public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
         TestOperatorImpl.this.curInputMsg = message;
         TestOperatorImpl.this.curCollector = collector;
         TestOperatorImpl.this.curCoordinator = coordinator;
       }
     };
-    // verify subscribe() added the mockSub and nextProcessors() invoked the mockSub.onNext()
-    Subscriber<ProcessorContext<TestOutputMessage>> mockSub = mock(Subscriber.class);
-    opImpl.subscribe(mockSub);
-    TestOutputMessage xOutput = mock(TestOutputMessage.class);
+    // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
+    OperatorImpl mockSub = mock(OperatorImpl.class);
+    opImpl.registerNextOperator(mockSub);
+    TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    opImpl.nextProcessors(xOutput, mockCollector, mockCoordinator);
-    verify(mockSub, times(1)).onNext(argThat(new ArgumentMatcher<ProcessorContext<TestOutputMessage>>() {
-      @Override public boolean matches(Object argument) {
-        ProcessorContext<TestOutputMessage> pCntx = (ProcessorContext<TestOutputMessage>) argument;
-        return pCntx.getMessage().equals(xOutput) && pCntx.getCoordinator().equals(mockCoordinator) && pCntx.getCollector().equals(mockCollector);
-      }
-    }));
+    opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
+    verify(mockSub, times(1)).onNext(
+        argThat(new IsEqual<>(xOutput)),
+        argThat(new IsEqual<>(mockCollector)),
+        argThat(new IsEqual<>(mockCoordinator))
+    );
     // verify onNext() is invoked correctly
-    TestMessage mockInput = mock(TestMessage.class);
-    ProcessorContext<TestMessage> inCntx = new ProcessorContext<>(mockInput, mockCollector, mockCoordinator);
-    opImpl.onNext(inCntx);
+    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+    opImpl.onNext(mockInput, mockCollector, mockCoordinator);
     assertEquals(mockInput, this.curInputMsg);
     assertEquals(mockCollector, this.curCollector);
     assertEquals(mockCoordinator, this.curCoordinator);

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
new file mode 100644
index 0000000..1d0d547
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorImpls {
+  Field nextOperatorsField = null;
+
+  @Before
+  public void prep() throws NoSuchFieldException {
+    nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
+    nextOperatorsField.setAccessible(true);
+  }
+  
+  @Test
+  public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException {
+    // get window operator
+    WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
+    OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = OperatorImpls.createOperatorImpl(mockWnd);
+    assertTrue(opImpl instanceof SessionWindowOperatorImpl);
+    Field sessWndField = SessionWindowOperatorImpl.class.getDeclaredField("windowSpec");
+    sessWndField.setAccessible(true);
+    WindowOperatorSpec sessWnd = (WindowOperatorSpec) sessWndField.get(opImpl);
+    assertEquals(sessWnd, mockWnd);
+
+    // get simple operator
+    StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
+    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
+    when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
+    opImpl = OperatorImpls.createOperatorImpl(mockSimpleOp);
+    assertTrue(opImpl instanceof StreamOperatorImpl);
+    Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
+    txfmFnField.setAccessible(true);
+    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
+
+    // get sink operator
+    SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
+    when(sinkOp.getSinkFn()).thenReturn(sinkFn);
+    opImpl = OperatorImpls.createOperatorImpl(sinkOp);
+    assertTrue(opImpl instanceof SinkOperatorImpl);
+    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
+    sinkFnField.setAccessible(true);
+    assertEquals(sinkFn, sinkFnField.get(opImpl));
+
+    // get join operator
+    PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
+    TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+    BiFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = (m1, m2) -> mockOutput;
+    when(joinOp.getTransformFn()).thenReturn(joinFn);
+    opImpl = OperatorImpls.createOperatorImpl(joinOp);
+    assertTrue(opImpl instanceof PartialJoinOperatorImpl);
+  }
+
+  @Test
+  public void testEmptyChain() {
+    // test creation of empty chain
+    MessageStreamImpl<TestMessageEnvelope> testStream = new MessageStreamImpl<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testStream, mockContext);
+    assertTrue(operatorChain != null);
+  }
+
+  @Test
+  public void testLinearChain() throws IllegalAccessException {
+    // test creation of linear chain
+    MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessageEnvelope::getKey));
+    RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
+    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
+    assertEquals(subsOps.size(), 1);
+    OperatorImpl wndOpImpl = subsOps.iterator().next();
+    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
+    assertEquals(subsOps.size(), 0);
+  }
+
+  @Test
+  public void testBroadcastChain() throws IllegalAccessException {
+    // test creation of broadcast chain
+    MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
+    testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
+    RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
+    assertEquals(subsSet.size(), 2);
+    Iterator<OperatorImpl> iter = subsSet.iterator();
+    // check the first branch w/ flatMap
+    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
+    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
+    assertEquals(subsOps.size(), 1);
+    OperatorImpl flatMapImpl = subsOps.iterator().next();
+    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
+    assertEquals(subsOps.size(), 0);
+    // check the second branch w/ map
+    opImpl = iter.next();
+    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
+    assertEquals(subsOps.size(), 1);
+    OperatorImpl mapImpl = subsOps.iterator().next();
+    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
+    assertEquals(subsOps.size(), 0);
+  }
+
+  @Test
+  public void testJoinChain() throws IllegalAccessException {
+    // test creation of join chain
+    MessageStreamImpl<TestMessageEnvelope> input1 = new MessageStreamImpl<>();
+    MessageStreamImpl<TestMessageEnvelope> input2 = new MessageStreamImpl<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    input1
+        .join(input2, (m1, m2) ->
+            new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()))
+        .map(m -> m);
+    // now, we create chained operators from each input sources
+    RootOperatorImpl chain1 = OperatorImpls.createOperatorImpls(input1, mockContext);
+    RootOperatorImpl chain2 = OperatorImpls.createOperatorImpls(input2, mockContext);
+    // check that those two chains will merge at map operator
+    // first branch of the join
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
+    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
+    assertEquals(subsOps.size(), 1);
+    // the map operator consumes the common join output, where two branches merge
+    OperatorImpl mapImpl = subsOps.iterator().next();
+    // second branch of the join
+    subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
+    assertNotSame(joinOp1, joinOp2);
+    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
+    assertEquals(subsOps.size(), 1);
+    // make sure that the map operator is the same
+    assertEquals(mapImpl, subsOps.iterator().next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
deleted file mode 100644
index 224245e..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-
-public class TestProcessorContext {
-  @Test public void testConstructor() {
-    TestMessage mockMsg = mock(TestMessage.class);
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
-    ProcessorContext<TestMessage> pCntx = new ProcessorContext<>(mockMsg, mockCollector, mockTaskCoordinator);
-    assertEquals(pCntx.getMessage(), mockMsg);
-    assertEquals(pCntx.getCollector(), mockCollector);
-    assertEquals(pCntx.getCoordinator(), mockTaskCoordinator);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java
new file mode 100644
index 0000000..c302df7
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+import java.lang.reflect.Field;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestSessionWindowImpl {
+  Field wndStoreField = null;
+  Field sessWndField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    wndStoreField = SessionWindowOperatorImpl.class.getDeclaredField("stateStore");
+    sessWndField = SessionWindowOperatorImpl.class.getDeclaredField("windowSpec");
+    wndStoreField.setAccessible(true);
+    sessWndField.setAccessible(true);
+  }
+
+  @Test
+  public void testConstructor() throws IllegalAccessException, NoSuchFieldException {
+    // test constructing a SessionWindowOperatorImpl w/ expected mock functions
+    WindowOperatorSpec<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperatorSpec.class);
+    SessionWindowOperatorImpl<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowOperatorImpl<>(wndOp);
+    assertEquals(wndOp, sessWndField.get(sessWnd));
+  }
+
+  @Test
+  public void testInitAndProcess() throws IllegalAccessException {
+    WindowOperatorSpec<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperatorSpec.class);
+    BiFunction<TestMessageEnvelope, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+    SessionWindowOperatorImpl<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowOperatorImpl<>(wndOp);
+
+    // construct and init the SessionWindowOperatorImpl object
+    MessageStreamImpl<TestMessageEnvelope> mockInputStrm = mock(MessageStreamImpl.class);
+    StoreFunctions<TestMessageEnvelope, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
+    Function<TestMessageEnvelope, String> wndKeyFn = m -> "test-msg-key";
+    when(mockStoreFns.getStoreKeyFn()).thenReturn(wndKeyFn);
+    when(wndOp.getStoreFns()).thenReturn(mockStoreFns);
+    when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
+    when(wndOp.getTransformFn()).thenReturn(mockTxfmFn);
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
+    when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
+    sessWnd.init(mockInputStrm, mockContext);
+
+    // test onNext() method. Make sure the transformation function and the state update functions are invoked.
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    BiFunction<TestMessageEnvelope, WindowState<Integer>, WindowState<Integer>> stateUpdaterFn = mock(BiFunction.class);
+    when(mockStoreFns.getStateUpdaterFn()).thenReturn(stateUpdaterFn);
+    WindowState<Integer> mockNewState = mock(WindowState.class);
+    WindowState<Integer> oldState = mock(WindowState.class);
+    when(mockKvStore.get("test-msg-key")).thenReturn(oldState);
+    when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState);
+    sessWnd.onNext(mockMsg, mockCollector, mockCoordinator);
+    verify(mockTxfmFn, times(1)).apply(argThat(new ArgumentMatcher<TestMessageEnvelope>() {
+      @Override public boolean matches(Object argument) {
+        TestMessageEnvelope xIn = (TestMessageEnvelope) argument;
+        return xIn.equals(mockMsg);
+      }
+    }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() {
+      @Override public boolean matches(Object argument) {
+        Entry<String, WindowState<Integer>> xIn = (Entry<String, WindowState<Integer>>) argument;
+        return xIn.getKey().equals("test-msg-key") && xIn.getValue().equals(oldState);
+      }
+    }));
+    verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState);
+    verify(mockKvStore, times(1)).put("test-msg-key", mockNewState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
deleted file mode 100644
index de029ea..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.internal.Operators.StreamOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestSimpleOperatorImpl {
-
-  @Test public void testSimpleOperator() {
-    StreamOperator<TestMessage, TestOutputMessage> mockOp = mock(StreamOperator.class);
-    Function<TestMessage, Collection<TestOutputMessage>> txfmFn = mock(Function.class);
-    when(mockOp.getFunction()).thenReturn(txfmFn);
-
-    SimpleOperatorImpl<TestMessage, TestOutputMessage> opImpl = spy(new SimpleOperatorImpl<>(mockOp));
-    TestMessage inMsg = mock(TestMessage.class);
-    TestOutputMessage outMsg = mock(TestOutputMessage.class);
-    Collection<TestOutputMessage> mockOutputs = new ArrayList() { {
-        this.add(outMsg);
-      } };
-    when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    opImpl.onNext(inMsg, mockCollector, mockCoordinator);
-    verify(txfmFn, times(1)).apply(inMsg);
-    verify(opImpl, times(1)).nextProcessors(outMsg, mockCollector, mockCoordinator);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index cdac3fc..ba5b6f8 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,25 +18,28 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class TestSinkOperatorImpl {
 
-  @Test public void testSinkOperator() {
-    SinkOperator<TestOutputMessage> sinkOp = mock(SinkOperator.class);
-    MessageStream.VoidFunction3<TestOutputMessage, MessageCollector, TaskCoordinator> sinkFn = mock(
-        MessageStream.VoidFunction3.class);
-    when(sinkOp.getFunction()).thenReturn(sinkFn);
-    SinkOperatorImpl<TestOutputMessage> sinkImpl = new SinkOperatorImpl<>(sinkOp);
-    TestOutputMessage mockMsg = mock(TestOutputMessage.class);
+  @Test
+  public void testSinkOperator() {
+    SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
+    SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
+    when(sinkOp.getSinkFn()).thenReturn(sinkFn);
+    SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp);
+    TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
deleted file mode 100644
index 5ede757..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.internal.Operators.StoreFunctions;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
-
-public class TestStateStoreImpl {
-  @Test public void testStateStoreImpl() {
-    StoreFunctions<TestMessage, String, WindowState> mockStoreFunctions = mock(StoreFunctions.class);
-    // test constructor
-    StateStoreImpl<TestMessage, String, WindowState> storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName");
-    TaskContext mockContext = mock(TaskContext.class);
-    KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class);
-    when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
-    // test init()
-    storeImpl.init(mockContext);
-    verify(mockContext, times(1)).getStore("myStoreName");
-    Function<TestMessage, String> wndKeyFn = mock(Function.class);
-    when(mockStoreFunctions.getStoreKeyFinder()).thenReturn(wndKeyFn);
-    TestMessage mockMsg = mock(TestMessage.class);
-    when(wndKeyFn.apply(mockMsg)).thenReturn("myKey");
-    WindowState mockState = mock(WindowState.class);
-    when(mockKvStore.get("myKey")).thenReturn(mockState);
-    // test getState()
-    Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg);
-    assertEquals(storeEntry.getKey(), "myKey");
-    assertEquals(storeEntry.getValue(), mockState);
-    verify(wndKeyFn, times(1)).apply(mockMsg);
-    verify(mockKvStore, times(1)).get("myKey");
-    Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState);
-    WindowState mockNewState = mock(WindowState.class);
-    BiFunction<TestMessage, WindowState, WindowState> mockUpdaterFn = mock(BiFunction.class);
-    when(mockStoreFunctions.getStateUpdater()).thenReturn(mockUpdaterFn);
-    when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState);
-    // test updateState()
-    Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, oldEntry);
-    assertEquals(newEntry.getKey(), "myKey");
-    assertEquals(newEntry.getValue(), mockNewState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
new file mode 100644
index 0000000..5a3840c
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestStreamOperatorImpl {
+
+  @Test
+  public void testSimpleOperator() {
+    StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
+    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
+    when(mockOp.getTransformFn()).thenReturn(txfmFn);
+
+    StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp));
+    TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
+    TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
+    Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
+        this.add(outMsg);
+      } };
+    when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+    verify(txfmFn, times(1)).apply(inMsg);
+    verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
deleted file mode 100644
index 719ab99..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.window;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.internal.Operators.StoreFunctions;
-import org.apache.samza.operators.internal.Operators.WindowOperator;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-
-import java.lang.reflect.Field;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
-
-public class TestSessionWindowImpl {
-  Field wndStoreField = null;
-  Field sessWndField = null;
-
-  @Before public void prep() throws NoSuchFieldException {
-    wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore");
-    sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
-    wndStoreField.setAccessible(true);
-    sessWndField.setAccessible(true);
-  }
-
-  @Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException {
-    // test constructing a SessionWindowImpl w/ expected mock functions
-    WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
-    SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
-    assertEquals(wndOp, sessWndField.get(sessWnd));
-  }
-
-  @Test public void testInitAndProcess() throws IllegalAccessException {
-    WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
-    SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
-
-    // construct and init the SessionWindowImpl object
-    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
-    StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
-    Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
-    when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
-    when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
-    when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
-    when(wndOp.getFunction()).thenReturn(mockTxfmFn);
-    TaskContext mockContext = mock(TaskContext.class);
-    KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
-    when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
-    sessWnd.init(mockInputStrm, mockContext);
-
-    // test onNext() method. Make sure the transformation function and the state update functions are invoked.
-    TestMessage mockMsg = mock(TestMessage.class);
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    BiFunction<TestMessage, WindowState<Integer>, WindowState<Integer>> stateUpdaterFn = mock(BiFunction.class);
-    when(mockStoreFns.getStateUpdater()).thenReturn(stateUpdaterFn);
-    WindowState<Integer> mockNewState = mock(WindowState.class);
-    WindowState<Integer> oldState = mock(WindowState.class);
-    when(mockKvStore.get("test-msg-key")).thenReturn(oldState);
-    when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState);
-    sessWnd.onNext(mockMsg, mockCollector, mockCoordinator);
-    verify(mockTxfmFn, times(1)).apply(argThat(new ArgumentMatcher<TestMessage>() {
-      @Override public boolean matches(Object argument) {
-        TestMessage xIn = (TestMessage) argument;
-        return xIn.equals(mockMsg);
-      }
-    }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() {
-      @Override public boolean matches(Object argument) {
-        Entry<String, WindowState<Integer>> xIn = (Entry<String, WindowState<Integer>>) argument;
-        return xIn.getKey().equals("test-msg-key") && xIn.getValue().equals(oldState);
-      }
-    }));
-    verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState);
-    verify(mockKvStore, times(1)).put("test-msg-key", mockNewState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
new file mode 100644
index 0000000..028bd67
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.Trigger;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorSpecs {
+  @Test
+  public void testGetStreamOperator() {
+    FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
+        this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+      } };
+    StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperator(transformFn);
+    assertEquals(strmOp.getTransformFn(), transformFn);
+    assertTrue(strmOp.getOutputStream() instanceof MessageStreamImpl);
+  }
+
+  @Test
+  public void testGetSinkOperator() {
+    SinkFunction<TestMessageEnvelope> sinkFn = (m, c, t) -> { };
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperator(sinkFn);
+    assertEquals(sinkOp.getSinkFn(), sinkFn);
+    assertTrue(sinkOp.getOutputStream() == null);
+  }
+
+  @Test
+  public void testGetWindowOperator() {
+    WindowFn<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
+    BiFunction<TestMessageEnvelope, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
+    StoreFunctions<TestMessageEnvelope, String, WindowState<Integer>> storeFns = mock(StoreFunctions.class);
+    Trigger<TestMessageEnvelope, WindowState<Integer>> trigger = mock(Trigger.class);
+    MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
+    when(windowFn.getTransformFn()).thenReturn(xFunction);
+    when(windowFn.getStoreFns()).thenReturn(storeFns);
+    when(windowFn.getTrigger()).thenReturn(trigger);
+    when(mockInput.toString()).thenReturn("mockStream1");
+
+    WindowOperatorSpec<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = OperatorSpecs
+        .createWindowOperator(windowFn);
+    assertEquals(windowOp.getTransformFn(), xFunction);
+    assertEquals(windowOp.getStoreFns(), storeFns);
+    assertEquals(windowOp.getTrigger(), trigger);
+    assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
+  }
+
+  @Test
+  public void testGetPartialJoinOperator() {
+    BiFunction<MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
+        (m1, m2) -> new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
+    MessageStreamImpl<TestMessageEnvelope> joinOutput = new MessageStreamImpl<>();
+    PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
+        OperatorSpecs.createPartialJoinOperator(merger, joinOutput);
+
+    assertEquals(partialJoin.getOutputStream(), joinOutput);
+    MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
+    MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
+    assertEquals(partialJoin.getTransformFn(), merger);
+    assertEquals(partialJoin.getSelfStoreFns().getStoreKeyFn().apply(m), m.getKey());
+    assertEquals(partialJoin.getSelfStoreFns().getStateUpdaterFn().apply(m, s), m);
+    assertEquals(partialJoin.getJoinStoreFns().getStoreKeyFn().apply(m), m.getKey());
+    assertNull(partialJoin.getJoinStoreFns().getStateUpdaterFn());
+  }
+
+  @Test
+  public void testGetMergeOperator() {
+    MessageStreamImpl<TestMessageEnvelope> output = new MessageStreamImpl<>();
+    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperator(output);
+    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
+        this.add(t);
+      } };
+    TestMessageEnvelope t = mock(TestMessageEnvelope.class);
+    assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
+    assertEquals(mergeOp.getOutputStream(), output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
deleted file mode 100644
index d1b0a88..0000000
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.TriggerBuilder;
-import org.apache.samza.operators.Windows;
-import org.apache.samza.operators.data.IncomingSystemMessage;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.task.StreamOperatorTask;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collection;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class BroadcastOperatorTask implements StreamOperatorTask {
-  class MessageType {
-    String field1;
-    String field2;
-    String field3;
-    String field4;
-    String parKey;
-    private long timestamp;
-
-    public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  class JsonMessage extends InputJsonSystemMessage<MessageType> {
-
-    JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) {
-      super(key, data, offset, timestamp, partition);
-    }
-  }
-
-  @Override public void initOperators(Collection<SystemMessageStream> sources) {
-    sources.forEach(source -> {
-        MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage);
-
-        inputStream.filter(this::myFilter1).
-          window(Windows.<JsonMessage, String>intoSessionCounter(
-              m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
-            setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100).
-              addLateTriggerOnSizeLimit(10).
-              addTimeoutSinceLastMessage(30000)));
-
-        inputStream.filter(this::myFilter2).
-          window(Windows.<JsonMessage, String>intoSessions(
-              m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)).
-            setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
-              addTimeoutSinceLastMessage(30000)));
-
-        inputStream.filter(this::myFilter3).
-          window(Windows.<JsonMessage, String, MessageType>intoSessions(
-              m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()).
-            setTriggers(TriggerBuilder.<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
-              addTimeoutSinceFirstMessage(60000)));
-      }
-    );
-  }
-
-  JsonMessage getInputMessage(IncomingSystemMessage m1) {
-    return (JsonMessage) m1.getMessage();
-  }
-
-  boolean myFilter1(JsonMessage m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key1");
-  }
-
-  boolean myFilter2(JsonMessage m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key2");
-  }
-
-  boolean myFilter3(JsonMessage m1) {
-    return m1.getMessage().parKey.equals("key3");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
deleted file mode 100644
index 88aa159..0000000
--- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.operators.data.InputSystemMessage;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input message w/ Json message body and string as the key.
- */
-
-public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> {
-
-  private final String key;
-  private final T data;
-  private final Offset offset;
-  private final long timestamp;
-  private final SystemStreamPartition partition;
-
-  InputJsonSystemMessage(String key, T data, Offset offset, long timestamp, SystemStreamPartition partition) {
-    this.key = key;
-    this.data = data;
-    this.offset = offset;
-    this.timestamp = timestamp;
-    this.partition = partition;
-  }
-
-  @Override public T getMessage() {
-    return this.data;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-
-  @Override public Offset getOffset() {
-    return this.offset;
-  }
-
-  @Override public SystemStreamPartition getSystemStreamPartition() {
-    return this.partition;
-  }
-}
-