You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/01 22:51:04 UTC

[4/6] samza git commit: SAMZA-1054: Refactor Operator APIs

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
deleted file mode 100644
index 14e6562..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.Message;
-
-
-public class TestOutputMessage implements Message<String, Integer> {
-  private final String key;
-  private final Integer value;
-  private final long timestamp;
-
-  public TestOutputMessage(String key, Integer value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
new file mode 100644
index 0000000..284b30b
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+public class TestOutputMessageEnvelope implements MessageEnvelope<String, Integer> {
+  private final String key;
+  private final Integer value;
+
+  public TestOutputMessageEnvelope(String key, Integer value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  @Override
+  public Integer getMessage() {
+    return this.value;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
deleted file mode 100644
index 927b14b..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTriggerBuilder {
-  private Field earlyTriggerField;
-  private Field lateTriggerField;
-  private Field timerTriggerField;
-  private Field earlyTriggerUpdater;
-  private Field lateTriggerUpdater;
-
-  @Before
-  public void testPrep() throws Exception {
-    this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
-    this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
-    this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
-    this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
-    this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
-
-    this.earlyTriggerField.setAccessible(true);
-    this.lateTriggerField.setAccessible(true);
-    this.timerTriggerField.setAccessible(true);
-    this.earlyTriggerUpdater.setAccessible(true);
-    this.lateTriggerUpdater.setAccessible(true);
-  }
-
-  @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    when(mockState.getNumberMessages()).thenReturn(2000L);
-    assertTrue(triggerField.apply(null, mockState));
-
-    Function<TestMessage, Boolean> tokenFunc = m -> true;
-    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    TestMessage m = mock(TestMessage.class);
-    assertTrue(triggerField.apply(m, mockState));
-
-    builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
-    when(m.getTimestamp()).thenReturn(19999000000L);
-    assertFalse(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(1001000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class);
-    builder = TriggerBuilder.earlyTrigger(mockFunc);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    assertEquals(triggerField, mockFunc);
-
-    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddTimerTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addTimeoutSinceFirstMessage(10000L);
-    // exam that both earlyTrigger and timer triggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    // check the timer trigger
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    // exam that both early trigger and timer triggers are set up
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    builder.addTimeoutSinceLastMessage(20000L);
-    // check the timer trigger
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddLateTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTriggerOnSizeLimit(10000L);
-    // exam that both earlyTrigger and lateTriggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // check the late trigger
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    // set the number of messages to 10001 to trigger the late trigger
-    when(mockState.getNumberMessages()).thenReturn(10001L);
-    assertTrue(lateTrigger.apply(null, mockState));
-
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
-    // exam that both earlyTrigger and lateTriggers are set up
-    earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // exam the lateTrigger
-    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
-    lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    List<TestMessage> mockList = mock(ArrayList.class);
-    when(mockList.size()).thenReturn(200);
-    when(mockState.getOutputValue()).thenReturn(mockList);
-    assertTrue(lateTrigger.apply(null, mockState));
-  }
-
-  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.onEarlyTrigger(c -> {
-        c.clear();
-        return c;
-      });
-    List<TestMessage> collection = new ArrayList<TestMessage>() { {
-        for (int i = 0; i < 10; i++) {
-          this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime()));
-        }
-      } };
-    // exam that earlyTriggerUpdater is set up
-    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    earlyTriggerUpdater.apply(mockState);
-    assertTrue(collection.isEmpty());
-
-    collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime()));
-    collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime()));
-    builder.onLateTrigger(c -> {
-        c.removeIf(t -> t.getKey().equals("key-to-remove"));
-        return c;
-      });
-    // check the late trigger updater
-    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    lateTriggerUpdater.apply(mockState);
-    assertTrue(collection.size() == 1);
-    assertFalse(collection.get(0).isDelete());
-    assertEquals(collection.get(0).getKey(), "key-to-stay");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
deleted file mode 100644
index 8a25a96..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.Windows.Window;
-import org.apache.samza.operators.internal.Trigger;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestWindows {
-
-  @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
-    // test constructing the default session window
-    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions(
-        TestMessage::getKey);
-    assertTrue(testWnd instanceof Windows.SessionWindow);
-    Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
-    Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator");
-    wndKeyFuncField.setAccessible(true);
-    aggregatorField.setAccessible(true);
-    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key");
-    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc =
-        (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains(mockMsg));
-
-    // test constructing the session window w/ customized session info
-    Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
-        m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0));
-    assertTrue(testWnd2 instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
-    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0");
-    when(mockMsg.getMessage()).thenReturn("x-001");
-    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains('x'));
-
-    // test constructing session window w/ a default counter
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    assertTrue(testCounter instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter);
-    BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
-    when(mockMsg.getTimestamp()).thenReturn(12345L);
-    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
-    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
-  }
-
-  @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    // test session window w/ a trigger
-    TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
-    testCounter.setTriggers(triggerBuilder);
-    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
-    Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
-    // examine all trigger fields are expected
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdater.setAccessible(true);
-    lateTriggerUpdater.setAccessible(true);
-    assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
-    assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
-    assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
-    assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
-    assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
index b734e87..7bd62a7 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -30,9 +30,10 @@ import static org.mockito.Mockito.when;
 
 public class TestIncomingSystemMessage {
 
-  @Test public void testConstructor() {
+  @Test
+  public void testConstructor() {
     IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+    IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime);
 
     Object mockKey = mock(Object.class);
     Object mockValue = mock(Object.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
index 943c47f..7838896 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
@@ -28,7 +28,8 @@ import static org.mockito.Mockito.mock;
 
 public class TestLongOffset {
 
-  @Test public void testConstructor() throws Exception {
+  @Test
+  public void testConstructor() throws Exception {
     LongOffset o1 = new LongOffset("12345");
     Field offsetField = LongOffset.class.getDeclaredField("offset");
     offsetField.setAccessible(true);
@@ -47,7 +48,8 @@ public class TestLongOffset {
     }
   }
 
-  @Test public void testComparator() {
+  @Test
+  public void testComparator() {
     LongOffset o1 = new LongOffset("11111");
     Offset other = mock(Offset.class);
     try {
@@ -65,7 +67,8 @@ public class TestLongOffset {
     assertEquals(o1.compareTo(o4), 0);
   }
 
-  @Test public void testEquals() {
+  @Test
+  public void testEquals() {
     LongOffset o1 = new LongOffset("12345");
     Offset other = mock(Offset.class);
     assertFalse(o1.equals(other));

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
deleted file mode 100644
index d994486..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperators {
-
-  private class TestMessage implements Message<String, Object> {
-    private final long timestamp;
-    private final String key;
-    private final Object msg;
-
-
-    TestMessage(String key, Object msg, long timestamp) {
-      this.timestamp = timestamp;
-      this.key = key;
-      this.msg = msg;
-    }
-
-    @Override public Object getMessage() {
-      return this.msg;
-    }
-
-    @Override public String getKey() {
-      return this.key;
-    }
-
-    @Override public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  @Test public void testGetStreamOperator() {
-    Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { {
-        this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
-      } };
-    Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn);
-    assertEquals(strmOp.getFunction(), transformFn);
-    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
-  }
-
-  @Test public void testGetSinkOperator() {
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { };
-    Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn);
-    assertEquals(sinkOp.getFunction(), sinkFn);
-    assertTrue(sinkOp.getOutputStream() == null);
-  }
-
-  @Test public void testGetWindowOperator() {
-    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
-    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class);
-    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
-    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
-    when(windowFn.getTransformFunc()).thenReturn(xFunction);
-    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
-    when(windowFn.getTrigger()).thenReturn(trigger);
-    when(mockInput.toString()).thenReturn("mockStream1");
-
-    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
-    assertEquals(windowOp.getFunction(), xFunction);
-    assertEquals(windowOp.getStoreFunctions(), storeFns);
-    assertEquals(windowOp.getTrigger(), trigger);
-    assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
-  }
-
-  @Test public void testGetPartialJoinOperator() {
-    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
-        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
-            Math.max(m1.getTimestamp(), m2.getTimestamp()));
-    MessageStream<TestMessage> joinOutput = new MessageStream<>();
-    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin =
-        Operators.getPartialJoinOperator(merger, joinOutput);
-
-    assertEquals(partialJoin.getOutputStream(), joinOutput);
-    Message<Object, Object> m = mock(Message.class);
-    Message<Object, Object> s = mock(Message.class);
-    assertEquals(partialJoin.getFunction(), merger);
-    assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
-    assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m);
-    assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
-    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
-  }
-
-  @Test public void testGetMergeOperator() {
-    MessageStream<TestMessage> output = new MessageStream<>();
-    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
-    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { {
-        this.add(t);
-      } };
-    TestMessage t = mock(TestMessage.class);
-    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getOutputStream(), output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
deleted file mode 100644
index 0f35a7c..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestTrigger {
-
-  @Test public void testConstructor() throws Exception {
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
-    Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
-    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> {
-      s.setOutputValue(0);
-      return s;
-    };
-    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> {
-      s.setOutputValue(1);
-      return s;
-    };
-
-    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
-        earlyTriggerUpdater, lateTriggerUpdater);
-
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdaterField.setAccessible(true);
-    lateTriggerUpdaterField.setAccessible(true);
-
-    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
-    assertEquals(timerTrigger, timerTriggerField.get(trigger));
-    assertEquals(lateTrigger, lateTriggerField.get(trigger));
-    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
-    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
deleted file mode 100644
index 268c9fc..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
-
-  @Test public void testConstructor() {
-    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
-    assertEquals(wndOutput.getKey(), "testMsg");
-    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
-    assertFalse(wndOutput.isDelete());
-    assertEquals(wndOutput.getTimestamp(), 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
new file mode 100644
index 0000000..a91af24
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+  @Test
+  public void testConstructor() throws Exception {
+    BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+    BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+    Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
+    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> {
+      s.setOutputValue(0);
+      return s;
+    };
+    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> {
+      s.setOutputValue(1);
+      return s;
+    };
+
+    Trigger<MessageEnvelope<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+        earlyTriggerUpdater, lateTriggerUpdater);
+
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdaterField.setAccessible(true);
+    lateTriggerUpdaterField.setAccessible(true);
+
+    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+    assertEquals(timerTrigger, timerTriggerField.get(trigger));
+    assertEquals(lateTrigger, lateTriggerField.get(trigger));
+    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
new file mode 100644
index 0000000..6a9b55d
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder {
+  private Field earlyTriggerField;
+  private Field lateTriggerField;
+  private Field timerTriggerField;
+  private Field earlyTriggerUpdater;
+  private Field lateTriggerUpdater;
+
+  @Before
+  public void testPrep() throws Exception {
+    this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
+    this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
+    this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
+    this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+    this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+    this.earlyTriggerField.setAccessible(true);
+    this.lateTriggerField.setAccessible(true);
+    this.timerTriggerField.setAccessible(true);
+    this.earlyTriggerUpdater.setAccessible(true);
+    this.lateTriggerUpdater.setAccessible(true);
+  }
+
+  @Test
+  public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
+        (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    when(mockState.getNumberMessages()).thenReturn(2000L);
+    assertTrue(triggerField.apply(null, mockState));
+
+    Function<TestMessageEnvelope, Boolean> tokenFunc = m -> true;
+    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+    triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    TestMessageEnvelope m = mock(TestMessageEnvelope.class);
+    assertTrue(triggerField.apply(m, mockState));
+
+    builder = TriggerBuilder.earlyTriggerOnEventTime(tm -> tm.getMessage().getEventTime(), 30000L);
+    triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+    TestMessageEnvelope.MessageType mockInnerMessage;
+    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockInnerMessage.getEventTime()).thenReturn(19999000000L);
+    when(m.getMessage()).thenReturn(mockInnerMessage);
+    assertFalse(triggerField.apply(m, mockState));
+    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockInnerMessage.getEventTime()).thenReturn(32000000000L);
+    when(m.getMessage()).thenReturn(mockInnerMessage);
+    assertTrue(triggerField.apply(m, mockState));
+    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(m.getMessage()).thenReturn(mockInnerMessage);
+    when(mockInnerMessage.getEventTime()).thenReturn(1001000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+
+    BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> mockFunc = mock(BiFunction.class);
+    builder = TriggerBuilder.earlyTrigger(mockFunc);
+    triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    assertEquals(triggerField, mockFunc);
+
+    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+    Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+    timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test
+  public void testAddTimerTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addTimeoutSinceFirstMessage(10000L);
+    // exam that both earlyTrigger and timer triggers are set up
+    BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
+        (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    // check the timer trigger
+    Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    // exam that both early trigger and timer triggers are set up
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    builder.addTimeoutSinceLastMessage(20000L);
+    // check the timer trigger
+    timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test
+  public void testAddLateTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTriggerOnSizeLimit(10000L);
+    // exam that both earlyTrigger and lateTriggers are set up
+    BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> earlyTrigger =
+        (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // check the late trigger
+    BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> lateTrigger =
+        (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    // set the number of messages to 10001 to trigger the late trigger
+    when(mockState.getNumberMessages()).thenReturn(10001L);
+    assertTrue(lateTrigger.apply(null, mockState));
+
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+    // exam that both earlyTrigger and lateTriggers are set up
+    earlyTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // exam the lateTrigger
+    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+    lateTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    List<TestMessageEnvelope> mockList = mock(ArrayList.class);
+    when(mockList.size()).thenReturn(200);
+    when(mockState.getOutputValue()).thenReturn(mockList);
+    assertTrue(lateTrigger.apply(null, mockState));
+  }
+
+  @Test
+  public void testAddTriggerUpdater() throws IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.onEarlyTrigger(c -> {
+        c.clear();
+        return c;
+      });
+    List<TestMessageEnvelope> collection = new ArrayList<TestMessageEnvelope>() { {
+        for (int i = 0; i < 10; i++) {
+          this.add(new TestMessageEnvelope(String.format("key-%d", i), "string-value", System.nanoTime()));
+        }
+      } };
+    // exam that earlyTriggerUpdater is set up
+    Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> earlyTriggerUpdater =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.earlyTriggerUpdater.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    earlyTriggerUpdater.apply(mockState);
+    assertTrue(collection.isEmpty());
+
+    collection.add(new TestMessageEnvelope("key-to-stay", "string-to-stay", System.nanoTime()));
+    collection.add(new TestMessageEnvelope("key-to-remove", "string-to-remove", System.nanoTime()));
+    builder.onLateTrigger(c -> {
+        c.removeIf(t -> t.getKey().equals("key-to-remove"));
+        return c;
+      });
+    // check the late trigger updater
+    Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> lateTriggerUpdater =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.lateTriggerUpdater.get(builder);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    lateTriggerUpdater.apply(mockState);
+    assertTrue(collection.size() == 1);
+    assertFalse(collection.get(0).isDelete());
+    assertEquals(collection.get(0).getKey(), "key-to-stay");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
new file mode 100644
index 0000000..7f81fc9
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+  @Test
+  public void testConstructor() {
+    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+    assertEquals(wndOutput.getKey(), "testMsg");
+    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+    assertFalse(wndOutput.isDelete());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
new file mode 100644
index 0000000..26af26e
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+  @Test
+  public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
+    // test constructing the default session window
+    Window<TestMessageEnvelope, String, Collection<TestMessageEnvelope>, WindowOutput<String, Collection<TestMessageEnvelope>>> testWnd = Windows
+        .intoSessions(
+        TestMessageEnvelope::getKey);
+    assertTrue(testWnd instanceof SessionWindow);
+    Field wndKeyFuncField = SessionWindow.class.getDeclaredField("wndKeyFunction");
+    Field aggregatorField = SessionWindow.class.getDeclaredField("aggregator");
+    wndKeyFuncField.setAccessible(true);
+    aggregatorField.setAccessible(true);
+    Function<TestMessageEnvelope, String> wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd);
+    assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "test-key");
+    BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>> aggrFunc =
+        (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd);
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    Collection<TestMessageEnvelope> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains(mockMsg));
+
+    // test constructing the session window w/ customized session info
+    Window<TestMessageEnvelope, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
+        m -> String.format("key-%d", m.getMessage().getEventTime()), m -> m.getMessage().getValue().charAt(0));
+    assertTrue(testWnd2 instanceof SessionWindow);
+    wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd2);
+    aggrFunc = (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd2);
+    assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "key-0");
+    TestMessageEnvelope.MessageType mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockMsg.getMessage()).thenReturn(mockInnerMessage);
+    when(mockInnerMessage.getValue()).thenReturn("x-001");
+    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains('x'));
+
+    // test constructing session window w/ a default counter
+    Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getMessage().getEventTime()));
+    assertTrue(testCounter instanceof SessionWindow);
+    wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testCounter);
+    BiFunction<TestMessageEnvelope, Integer, Integer> counterFn = (BiFunction<TestMessageEnvelope, Integer, Integer>) aggregatorField.get(testCounter);
+    when(mockMsg.getMessage().getEventTime()).thenReturn(12345L);
+    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+  }
+
+  @Test
+  public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
+    Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getMessage().getEventTime()));
+    // test session window w/ a trigger
+    TriggerBuilder<TestMessageEnvelope, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+    testCounter.setTriggers(triggerBuilder);
+    Trigger<TestMessageEnvelope, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
+    Trigger<TestMessageEnvelope, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
+    // examine all trigger fields are expected
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdater.setAccessible(true);
+    lateTriggerUpdater.setAccessible(true);
+    assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
+    assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
+    assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
+    assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
+    assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
new file mode 100644
index 0000000..231d3f5
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+
+/**
+ * The implementation for input/output {@link MessageStream}s to/from the operators.
+ * Users use the {@link MessageStream} API methods to describe and chain the operators specs.
+ *
+ * @param <M>  type of {@link MessageEnvelope}s in this {@link MessageStream}
+ */
+public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStream<M> {
+
+  /**
+   * The set of operators that consume the {@link MessageEnvelope}s in this {@link MessageStream}
+   */
+  private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
+
+  @Override
+  public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) {
+    OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperator(m -> new ArrayList<OM>() { {
+        OM r = mapFn.apply(m);
+        if (r != null) {
+          this.add(r);
+        }
+      } });
+    this.registeredOperatorSpecs.add(op);
+    return op.getOutputStream();
+  }
+
+  @Override
+  public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) {
+    OperatorSpec<OM> op = OperatorSpecs.createStreamOperator(flatMapFn);
+    this.registeredOperatorSpecs.add(op);
+    return op.getOutputStream();
+  }
+
+  @Override
+  public MessageStream<M> filter(FilterFunction<M> filterFn) {
+    OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperator(t -> new ArrayList<M>() { {
+        if (filterFn.apply(t)) {
+          this.add(t);
+        }
+      } });
+    this.registeredOperatorSpecs.add(op);
+    return op.getOutputStream();
+  }
+
+  @Override
+  public void sink(SinkFunction<M> sinkFn) {
+    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperator(sinkFn));
+  }
+
+  @Override
+  public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(
+      Window<M, WK, WV, WM> window) {
+    OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperator((WindowFn<M, WK, WS, WM>) window.getInternalWindowFn());
+    this.registeredOperatorSpecs.add(wndOp);
+    return wndOp.getOutputStream();
+  }
+
+  @Override
+  public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(
+      MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) {
+    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>();
+
+    BiFunction<M, JM, RM> parJoin1 = joinFn::apply;
+    BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m);
+
+    // TODO: need to add default store functions for the two partial join functions
+
+    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(
+        OperatorSpecs.createPartialJoinOperator(parJoin2, outputStream));
+    this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperator(parJoin1, outputStream));
+    return outputStream;
+  }
+
+  @Override
+  public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
+    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>();
+
+    otherStreams.add(this);
+    otherStreams.forEach(other ->
+        ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperator(outputStream)));
+    return outputStream;
+  }
+
+  /**
+   * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
+   * should not be exposed to users.
+   *
+   * @return  a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}.
+   */
+  public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
+    return Collections.unmodifiableSet(this.registeredOperatorSpecs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
new file mode 100644
index 0000000..2572f14
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The base class for all state stores
+ */
+public class StateStoreImpl<M extends MessageEnvelope, SK, SS> {
+  private final String storeName;
+  private final StoreFunctions<M, SK, SS> storeFunctions;
+  private KeyValueStore<SK, SS> kvStore = null;
+
+  public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
+    this.storeFunctions = store;
+    this.storeName = storeName;
+  }
+
+  public void init(TaskContext context) {
+    this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
+  }
+
+  public Entry<SK, SS> getState(M m) {
+    SK key = this.storeFunctions.getStoreKeyFn().apply(m);
+    SS state = this.kvStore.get(key);
+    return new Entry<>(key, state);
+  }
+
+  public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
+    SS newValue = this.storeFunctions.getStateUpdaterFn().apply(m, oldEntry.getValue());
+    this.kvStore.put(oldEntry.getKey(), newValue);
+    return new Entry<>(oldEntry.getKey(), newValue);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..152cd92
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.operators.impl.OperatorImpls;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * An {@link StreamTask} implementation that receives {@link IncomingSystemMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamOperatorTask#transform(Map)} using the
+ * {@link MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link IncomingSystemMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented {@link StreamOperatorTask}. When its own {@link #init(Config, TaskContext)}
+ * method is called during startup, it creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link SystemStreamPartition}s and then calls the user's {@link StreamOperatorTask#transform(Map)} method.
+ * <p>
+ * When users invoke the methods on the {@link MessageStream} API to describe their stream transformations in the
+ * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link MessageStreamImpl} creates the
+ * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to record information about the desired
+ * transformation, and returns the output {@link MessageStream} to allow further transform chaining.
+ * <p>
+ * Once the user's transformation DAGs have been described for all {@link MessageStream}s (i.e., when the
+ * {@link StreamOperatorTask#transform(Map)} call returns), it calls
+ * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link SystemStreamPartition}, it can pass the message
+ * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} along
+ * to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask {
+
+  /**
+   * A mapping from each {@link SystemStreamPartition} to the root node of its operator chain DAG.
+   */
+  private final Map<SystemStreamPartition, OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> operatorChains = new HashMap<>();
+
+  private final StreamOperatorTask userTask;
+
+  public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
+    this.userTask = userTask;
+  }
+
+  @Override
+  public final void init(Config config, TaskContext context) throws Exception {
+    if (this.userTask instanceof InitableTask) {
+      ((InitableTask) this.userTask).init(config, context);
+    }
+    Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams = new HashMap<>();
+    context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, new MessageStreamImpl<>()));
+    this.userTask.transform(messageStreams);
+    messageStreams.forEach((ssp, ms) ->
+        operatorChains.put(ssp, OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>) ms, context)));
+  }
+
+  @Override
+  public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+    this.operatorChains.get(ime.getSystemStreamPartition())
+        .onNext(new IncomingSystemMessageEnvelope(ime), collector, coordinator);
+  }
+
+  @Override
+  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    if (this.userTask instanceof WindowableTask) {
+      ((WindowableTask) this.userTask).window(collector, coordinator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
deleted file mode 100644
index 1eee2dc..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.Operator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * Implementation class for a chain of operators from the single input {@code source}
- *
- * @param <M>  type of message in the input stream {@code source}
- */
-public class ChainedOperators<M extends Message> {
-
-  private final Set<OperatorImpl> subscribers = new HashSet<>();
-
-  /**
-   * Private constructor
-   *
-   * @param source  the input source {@link MessageStream}
-   * @param context  the {@link TaskContext} object that we need to instantiate the state stores
-   */
-  private ChainedOperators(MessageStream<M> source, TaskContext context) {
-    // create the pipeline/topology starting from source
-    source.getSubscribers().forEach(sub -> {
-        // pass in the context s.t. stateful stream operators can initialize their stores
-        OperatorImpl subImpl = this.createAndSubscribe(sub, source, context);
-        this.subscribers.add(subImpl);
-      });
-  }
-
-  /**
-   * Private function to recursively instantiate the implementation of operators and the chains
-   *
-   * @param operator  the operator that subscribe to {@code source}
-   * @param source  the source {@link MessageStream}
-   * @param context  the context of the task
-   * @return  the implementation object of the corresponding {@code operator}
-   */
-  private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source,
-      TaskContext context) {
-    Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator);
-    if (factoryEntry.getValue()) {
-      // The operator has already been instantiated and we do not need to traverse and create the subscribers any more.
-      return factoryEntry.getKey();
-    }
-    OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey();
-    MessageStream outStream = operator.getOutputStream();
-    Collection<Operator> subs = outStream.getSubscribers();
-    subs.forEach(sub -> {
-        OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context);
-        opImpl.subscribe(subImpl);
-      });
-    // initialize the operator's state store
-    opImpl.init(source, context);
-    return opImpl;
-  }
-
-  /**
-   * Static method to create a {@link ChainedOperators} from the {@code source} stream
-   *
-   * @param source  the input source {@link MessageStream}
-   * @param context  the {@link TaskContext} object used to initialize the {@link StateStoreImpl}
-   * @param <M>  the type of input {@link Message}
-   * @return a {@link ChainedOperators} object takes the {@code source} as input
-   */
-  public static <M extends Message> ChainedOperators create(MessageStream<M> source, TaskContext context) {
-    return new ChainedOperators<>(source, context);
-  }
-
-  /**
-   * Method to navigate the incoming {@code message} through the processing chains
-   *
-   * @param message  the incoming message to this {@link ChainedOperators}
-   * @param collector  the {@link MessageCollector} object within the process context
-   * @param coordinator  the {@link TaskCoordinator} object within the process context
-   */
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
-    this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator));
-  }
-
-  /**
-   * Method to handle timer events
-   *
-   * @param collector  the {@link MessageCollector} object within the process context
-   * @param coordinator  the {@link TaskCoordinator} object within the process context
-   */
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    long nanoTime = System.nanoTime();
-    this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
deleted file mode 100644
index ea90878..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.*;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
-import org.apache.samza.operators.impl.window.SessionWindowImpl;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * The factory class that instantiates all implementation of {@link OperatorImpl} classes.
- */
-public class OperatorFactory {
-
-  /**
-   * the static operatorMap that includes all operator implementation instances
-   */
-  private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> OPERATOR_MAP = new ConcurrentHashMap<>();
-
-  /**
-   * The method to actually create the implementation instances of operators
-   *
-   * @param operator  the immutable definition of {@link Operator}
-   * @param <M>  type of input {@link Message}
-   * @param <RM>  type of output {@link Message}
-   * @return  the implementation object of {@link OperatorImpl}
-   */
-  private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) {
-    if (operator instanceof StreamOperator) {
-      return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator);
-    } else if (operator instanceof SinkOperator) {
-      return new SinkOperatorImpl<>((SinkOperator<M>) operator);
-    } else if (operator instanceof WindowOperator) {
-      return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator);
-    } else if (operator instanceof PartialJoinOperator) {
-      return new PartialJoinOpImpl<>((PartialJoinOperator) operator);
-    }
-    throw new IllegalArgumentException(
-        String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName()));
-  }
-
-  /**
-   * The method to get the unique implementation instance of {@link Operator}
-   *
-   * @param operator  the {@link Operator} to instantiate
-   * @param <M>  type of input {@link Message}
-   * @param <RM>  type of output {@link Message}
-   * @return  A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether
-   *          the operator instance has already been created or not. True means the operator instance has already created, false means the operator
-   *          was not created.
-   */
-  public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) {
-    if (!OPERATOR_MAP.containsKey(operator)) {
-      OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator);
-      if (OPERATOR_MAP.putIfAbsent(operator, operatorImpl) == null) {
-        return new Entry<OperatorImpl<M, ? extends Message>, Boolean>(operatorImpl, false) { };
-      }
-    }
-    return new Entry<OperatorImpl<M, ? extends Message>, Boolean>((OperatorImpl<M, ? extends Message>) OPERATOR_MAP.get(operator), true) { };
-  }
-
-}