You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2016/11/01 21:59:15 UTC

[1/4] samza git commit: SAMZA-1045: Move classes from operator/api into samza-api

Repository: samza
Updated Branches:
  refs/heads/samza-sql adcd26678 -> 1dac25e17


http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
deleted file mode 100644
index 8faa92c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTriggerBuilder{
-  private Field earlyTriggerField;
-  private Field lateTriggerField;
-  private Field timerTriggerField;
-  private Field earlyTriggerUpdater;
-  private Field lateTriggerUpdater;
-
-  @Before
-  public void testPrep() throws Exception {
-    this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
-    this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
-    this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
-    this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
-    this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
-
-    this.earlyTriggerField.setAccessible(true);
-    this.lateTriggerField.setAccessible(true);
-    this.timerTriggerField.setAccessible(true);
-    this.earlyTriggerUpdater.setAccessible(true);
-    this.lateTriggerUpdater.setAccessible(true);
-  }
-
-  @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    when(mockState.getNumberMessages()).thenReturn(2000L);
-    assertTrue(triggerField.apply(null, mockState));
-
-    Function<TestMessage, Boolean> tokenFunc = m -> true;
-    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    TestMessage m = mock(TestMessage.class);
-    assertTrue(triggerField.apply(m, mockState));
-
-    builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
-    when(m.getTimestamp()).thenReturn(19999000000L);
-    assertFalse(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(1001000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class);
-    builder = TriggerBuilder.earlyTrigger(mockFunc);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    assertEquals(triggerField, mockFunc);
-
-    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddTimerTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addTimeoutSinceFirstMessage(10000L);
-    // exam that both earlyTrigger and timer triggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    // check the timer trigger
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    // exam that both early trigger and timer triggers are set up
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    builder.addTimeoutSinceLastMessage(20000L);
-    // check the timer trigger
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
-    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddLateTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTriggerOnSizeLimit(10000L);
-    // exam that both earlyTrigger and lateTriggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // check the late trigger
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    // set the number of messages to 10001 to trigger the late trigger
-    when(mockState.getNumberMessages()).thenReturn(10001L);
-    assertTrue(lateTrigger.apply(null, mockState));
-
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
-    // exam that both earlyTrigger and lateTriggers are set up
-    earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // exam the lateTrigger
-    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
-    lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    List<TestMessage> mockList = mock(ArrayList.class);
-    when(mockList.size()).thenReturn(200);
-    when(mockState.getOutputValue()).thenReturn(mockList);
-    assertTrue(lateTrigger.apply(null, mockState));
-  }
-
-  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.onEarlyTrigger(c -> { c.clear(); return c;} );
-    List<TestMessage> collection = new ArrayList<TestMessage>() {{
-      for(int i = 0; i < 10; i++) {
-        this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime()));
-      }
-    }};
-    // exam that earlyTriggerUpdater is set up
-    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    earlyTriggerUpdater.apply(mockState);
-    assertTrue(collection.isEmpty());
-
-    collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime()));
-    collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime()));
-    builder.onLateTrigger(c -> {
-      c.removeIf(t -> t.getKey().equals("key-to-remove"));
-      return c;
-    });
-    // check the late trigger updater
-    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    lateTriggerUpdater.apply(mockState);
-    assertTrue(collection.size() == 1);
-    assertFalse(collection.get(0).isDelete());
-    assertEquals(collection.get(0).getKey(), "key-to-stay");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
deleted file mode 100644
index 47a37dc..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.Windows.Window;
-import org.apache.samza.operators.api.internal.Trigger;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestWindows {
-
-  @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
-    // test constructing the default session window
-    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions(
-        TestMessage::getKey);
-    assertTrue(testWnd instanceof Windows.SessionWindow);
-    Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
-    Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator");
-    wndKeyFuncField.setAccessible(true);
-    aggregatorField.setAccessible(true);
-    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key");
-    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc =
-        (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains(mockMsg));
-
-    // test constructing the session window w/ customized session info
-    Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
-        m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0));
-    assertTrue(testWnd2 instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
-    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0");
-    when(mockMsg.getMessage()).thenReturn("x-001");
-    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains('x'));
-
-    // test constructing session window w/ a default counter
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    assertTrue(testCounter instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter);
-    BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
-    when(mockMsg.getTimestamp()).thenReturn(12345L);
-    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
-    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
-  }
-
-  @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    // test session window w/ a trigger
-    TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
-    testCounter.setTriggers(triggerBuilder);
-    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
-    Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
-    // examine all trigger fields are expected
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdater.setAccessible(true);
-    lateTriggerUpdater.setAccessible(true);
-    assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
-    assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
-    assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
-    assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
-    assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
deleted file mode 100644
index e953078..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIncomingSystemMessage {
-
-  @Test public void testConstructor() {
-    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
-
-    Object mockKey = mock(Object.class);
-    Object mockValue = mock(Object.class);
-    LongOffset testOffset = new LongOffset("12345");
-    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
-
-    when(ime.getKey()).thenReturn(mockKey);
-    when(ime.getMessage()).thenReturn(mockValue);
-    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
-    when(ime.getOffset()).thenReturn("12345");
-
-    assertEquals(ism.getKey(), mockKey);
-    assertEquals(ism.getMessage(), mockValue);
-    assertEquals(ism.getSystemStreamPartition(), mockSsp);
-    assertEquals(ism.getOffset(), testOffset);
-    assertFalse(ism.isDelete());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
deleted file mode 100644
index 10775ec..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-
-public class TestLongOffset {
-
-  @Test public void testConstructor() throws Exception {
-    LongOffset o1 = new LongOffset("12345");
-    Field offsetField = LongOffset.class.getDeclaredField("offset");
-    offsetField.setAccessible(true);
-    Long x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    o1 = new LongOffset("012345");
-    x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    try {
-      o1 = new LongOffset("xyz");
-      fail("Constructor of LongOffset should have failed w/ mal-formatted numbers");
-    } catch (NumberFormatException nfe) {
-      // expected
-    }
-  }
-
-  @Test public void testComparator() {
-    LongOffset o1 = new LongOffset("11111");
-    Offset other = mock(Offset.class);
-    try {
-      o1.compareTo(other);
-      fail("compareTo() should have have failed when comparing to an object of a different class");
-    } catch (IllegalArgumentException iae) {
-      // expected
-    }
-
-    LongOffset o2 = new LongOffset("-10000");
-    assertEquals(o1.compareTo(o2), 1);
-    LongOffset o3 = new LongOffset("22222");
-    assertEquals(o1.compareTo(o3), -1);
-    LongOffset o4 = new LongOffset("11111");
-    assertEquals(o1.compareTo(o4), 0);
-  }
-
-  @Test public void testEquals() {
-    LongOffset o1 = new LongOffset("12345");
-    Offset other = mock(Offset.class);
-    assertFalse(o1.equals(other));
-
-    LongOffset o2 = new LongOffset("0012345");
-    assertTrue(o1.equals(o2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
deleted file mode 100644
index 6dc77e5..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperators {
-
-  private class TestMessage implements Message<String, Object> {
-    private final long timestamp;
-    private final String key;
-    private final Object msg;
-
-
-    TestMessage(String key, Object msg, long timestamp) {
-      this.timestamp = timestamp;
-      this.key = key;
-      this.msg = msg;
-    }
-
-    @Override public Object getMessage() {
-      return this.msg;
-    }
-
-    @Override public String getKey() {
-      return this.key;
-    }
-
-    @Override public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  @Test public void testGetStreamOperator() {
-    Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() {{
-      this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
-    }};
-    Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn);
-    assertEquals(strmOp.getFunction(), transformFn);
-    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
-  }
-
-  @Test public void testGetSinkOperator() {
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> {};
-    Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn);
-    assertEquals(sinkOp.getFunction(), sinkFn);
-    assertTrue(sinkOp.getOutputStream() == null);
-  }
-
-  @Test public void testGetWindowOperator() {
-    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
-    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class);
-    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
-    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
-    when(windowFn.getTransformFunc()).thenReturn(xFunction);
-    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
-    when(windowFn.getTrigger()).thenReturn(trigger);
-    when(mockInput.toString()).thenReturn("mockStream1");
-
-    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
-    assertEquals(windowOp.getFunction(), xFunction);
-    assertEquals(windowOp.getStoreFunctions(), storeFns);
-    assertEquals(windowOp.getTrigger(), trigger);
-    assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
-  }
-
-  @Test public void testGetPartialJoinOperator() {
-    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
-        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
-            Math.max(m1.getTimestamp(), m2.getTimestamp()));
-    MessageStream<TestMessage> joinOutput = new MessageStream<>();
-    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin =
-        Operators.getPartialJoinOperator(merger, joinOutput);
-
-    assertEquals(partialJoin.getOutputStream(), joinOutput);
-    Message<Object, Object> m = mock(Message.class);
-    Message<Object, Object> s = mock(Message.class);
-    assertEquals(partialJoin.getFunction(), merger);
-    assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
-    assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m);
-    assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
-    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
-  }
-
-  @Test public void testGetMergeOperator() {
-    MessageStream<TestMessage> output = new MessageStream<>();
-    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
-    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() {{
-      this.add(t);
-    }};
-    TestMessage t = mock(TestMessage.class);
-    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getOutputStream(), output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
deleted file mode 100644
index 727276a..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestTrigger {
-
-  @Test public void testConstructor() throws Exception {
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
-    Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
-    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { s.setOutputValue(0); return s; };
-    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { s.setOutputValue(1); return s; };
-
-    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
-        earlyTriggerUpdater, lateTriggerUpdater);
-
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdaterField.setAccessible(true);
-    lateTriggerUpdaterField.setAccessible(true);
-
-    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
-    assertEquals(timerTrigger, timerTriggerField.get(trigger));
-    assertEquals(lateTrigger, lateTriggerField.get(trigger));
-    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
-    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
deleted file mode 100644
index f3cf0e0..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
-
-  @Test public void testConstructor() {
-    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
-    assertEquals(wndOutput.getKey(), "testMsg");
-    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
-    assertFalse(wndOutput.isDelete());
-    assertEquals(wndOutput.getTimestamp(), 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
index d4d2378..33901a9 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.Windows;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.Windows;
 import org.apache.samza.task.TaskContext;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
index d228784..a49bfd3 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
-import org.apache.samza.operators.api.internal.Operators.SinkOperator;
-import org.apache.samza.operators.api.internal.Operators.StreamOperator;
-import org.apache.samza.operators.api.internal.Operators.WindowOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
 import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
 import org.apache.samza.operators.impl.window.SessionWindowImpl;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index d296111..4bd467d 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
index 14796fc..224245e 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.TestMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
index c8c4944..69f16d0 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index e711bc5..cdac3fc 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.internal.Operators.SinkOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
index eb8937a..5ede757 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
index 75cb00c..719ab99 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.operators.impl.window;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
-import org.apache.samza.operators.api.internal.Operators.WindowOperator;
-import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.operators.internal.WindowOutput;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 724bbba..493a688 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -20,12 +20,13 @@
 package org.apache.samza.task;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.Windows;
-import org.apache.samza.operators.api.TriggerBuilder;
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
index 33ae9c9..820d4f3 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.task;
 
-import org.apache.samza.operators.api.data.InputSystemMessage;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.data.InputSystemMessage;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.Offset;
 import org.apache.samza.system.SystemStreamPartition;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
index 825f4c4..00d01a8 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
@@ -19,10 +19,11 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
index 306425e..153d517 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
@@ -20,6 +20,7 @@ package org.apache.samza.task;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.Partition;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
index d6181ea..fe0ca42 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
@@ -36,7 +36,7 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Unit test for {@link StreamOperatorTask}
+ * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask}
  */
 public class TestStreamOperatorTasks {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
index 11186ea..de7bba5 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
@@ -19,11 +19,12 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.TriggerBuilder;
-import org.apache.samza.operators.api.Windows;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.Collection;


[4/4] samza git commit: SAMZA-1045: Move classes from operator/api into samza-api

Posted by ja...@apache.org.
SAMZA-1045: Move classes from operator/api into samza-api


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1dac25e1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1dac25e1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1dac25e1

Branch: refs/heads/samza-sql
Commit: 1dac25e1750b7f3fefa72def855136801462494d
Parents: adcd266
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Mon Oct 31 14:14:24 2016 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Tue Nov 1 14:34:28 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   4 +-
 checkstyle/checkstyle.xml                       |   9 +-
 gradle.properties                               |   3 +-
 gradle/wrapper/gradle-wrapper.jar               | Bin 49875 -> 52818 bytes
 gradle/wrapper/gradle-wrapper.properties        |   4 +-
 gradlew                                         |  57 ++-
 .../apache/samza/operators/MessageStream.java   | 186 ++++++++
 .../apache/samza/operators/MessageStreams.java  |  80 ++++
 .../apache/samza/operators/TriggerBuilder.java  | 321 +++++++++++++
 .../org/apache/samza/operators/WindowState.java |  77 +++
 .../org/apache/samza/operators/Windows.java     | 200 ++++++++
 .../operators/data/IncomingSystemMessage.java   |  76 +++
 .../operators/data/InputSystemMessage.java      |  43 ++
 .../apache/samza/operators/data/LongOffset.java |  80 ++++
 .../apache/samza/operators/data/Message.java    |  60 +++
 .../org/apache/samza/operators/data/Offset.java |  27 ++
 .../samza/operators/internal/Operators.java     | 469 +++++++++++++++++++
 .../samza/operators/internal/Trigger.java       |  95 ++++
 .../samza/operators/internal/WindowFn.java      |  60 +++
 .../samza/operators/internal/WindowOutput.java  |  55 +++
 .../operators/task/StreamOperatorTask.java      |  43 ++
 .../samza/storage/StorageEngineFactory.java     |  16 +-
 .../org/apache/samza/operators/TestMessage.java |  47 ++
 .../samza/operators/TestMessageStream.java      | 180 +++++++
 .../samza/operators/TestMessageStreams.java     |  35 ++
 .../samza/operators/TestOutputMessage.java      |  47 ++
 .../samza/operators/TestTriggerBuilder.java     | 214 +++++++++
 .../org/apache/samza/operators/TestWindows.java | 106 +++++
 .../data/TestIncomingSystemMessage.java         |  53 +++
 .../samza/operators/data/TestLongOffset.java    |  76 +++
 .../samza/operators/internal/TestOperators.java | 128 +++++
 .../samza/operators/internal/TestTrigger.java   |  68 +++
 .../operators/internal/TestWindowOutput.java    |  36 ++
 .../MockCoordinatorStreamWrappedConsumer.java   |   3 +-
 .../TestCoordinatorStreamSystemConsumer.java    |   4 +-
 .../samza/operators/api/MessageStream.java      | 188 --------
 .../samza/operators/api/MessageStreams.java     |  80 ----
 .../samza/operators/api/TriggerBuilder.java     | 314 -------------
 .../apache/samza/operators/api/WindowState.java |  77 ---
 .../org/apache/samza/operators/api/Windows.java | 195 --------
 .../api/data/IncomingSystemMessage.java         |  76 ---
 .../operators/api/data/InputSystemMessage.java  |  43 --
 .../samza/operators/api/data/LongOffset.java    |  75 ---
 .../samza/operators/api/data/Message.java       |  58 ---
 .../apache/samza/operators/api/data/Offset.java |  27 --
 .../samza/operators/api/internal/Operators.java | 468 ------------------
 .../samza/operators/api/internal/Trigger.java   |  95 ----
 .../samza/operators/api/internal/WindowFn.java  |  60 ---
 .../operators/api/internal/WindowOutput.java    |  55 ---
 .../samza/operators/impl/ChainedOperators.java  |   6 +-
 .../samza/operators/impl/OperatorFactory.java   |   8 +-
 .../samza/operators/impl/OperatorImpl.java      |   6 +-
 .../samza/operators/impl/ProcessorContext.java  |   2 +-
 .../operators/impl/SimpleOperatorImpl.java      |   4 +-
 .../samza/operators/impl/SinkOperatorImpl.java  |   6 +-
 .../samza/operators/impl/StateStoreImpl.java    |   4 +-
 .../operators/impl/join/PartialJoinOpImpl.java  |  11 +-
 .../impl/window/SessionWindowImpl.java          |  10 +-
 .../samza/task/StreamOperatorAdaptorTask.java   |  11 +-
 .../apache/samza/task/StreamOperatorTask.java   |  42 --
 .../apache/samza/operators/api/TestMessage.java |  47 --
 .../samza/operators/api/TestMessageStream.java  | 180 -------
 .../samza/operators/api/TestMessageStreams.java |  35 --
 .../samza/operators/api/TestOutputMessage.java  |  47 --
 .../samza/operators/api/TestTriggerBuilder.java | 211 ---------
 .../apache/samza/operators/api/TestWindows.java | 106 -----
 .../api/data/TestIncomingSystemMessage.java     |  53 ---
 .../operators/api/data/TestLongOffset.java      |  76 ---
 .../operators/api/internal/TestOperators.java   | 128 -----
 .../operators/api/internal/TestTrigger.java     |  62 ---
 .../api/internal/TestWindowOutput.java          |  36 --
 .../operators/impl/TestChainedOperators.java    |   8 +-
 .../operators/impl/TestOperatorFactory.java     |  16 +-
 .../samza/operators/impl/TestOperatorImpl.java  |   4 +-
 .../operators/impl/TestProcessorContext.java    |   2 +-
 .../operators/impl/TestSimpleOperatorImpl.java  |   6 +-
 .../operators/impl/TestSinkOperatorImpl.java    |   6 +-
 .../operators/impl/TestStateStoreImpl.java      |   6 +-
 .../impl/window/TestSessionWindowImpl.java      |  12 +-
 .../samza/task/BroadcastOperatorTask.java       |  13 +-
 .../samza/task/InputJsonSystemMessage.java      |   6 +-
 .../org/apache/samza/task/JoinOperatorTask.java |   9 +-
 .../task/TestStreamOperatorAdaptorTask.java     |   1 +
 .../samza/task/TestStreamOperatorTasks.java     |   2 +-
 .../apache/samza/task/WindowOperatorTask.java   |  11 +-
 85 files changed, 3001 insertions(+), 2965 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index aeefd1c..28c2dcf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -123,9 +123,9 @@ project(':samza-api') {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
-
   checkstyle {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    toolVersion = "$checkstyleVersion"
   }
 }
 
@@ -348,6 +348,8 @@ if (JavaVersion.current().isJava8Compatible()) {
       compile "org.apache.commons:commons-lang3:$commonsLang3Version"
       compile "org.apache.avro:avro:$avroVersion"
       compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
+
+      testCompile project(":samza-api").sourceSets.test.output
       testCompile "junit:junit:$junitVersion"
       testCompile "org.mockito:mockito-all:$mockitoVersion"
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 770b5e7..c23a617 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -53,19 +53,12 @@
     </module>
     <module name="LocalVariableName"/>
     <module name="LocalFinalVariableName"/>
-    <module name="ClassTypeParameterName"/>
     <module name="MemberName"/>
-    <module name="MethodTypeParameterName"/>
     <module name="PackageName"/>
     <module name="ParameterName"/>
     <module name="StaticVariableName"/>
     <module name="TypeName"/>
     
-    <!-- dependencies -->
-    <module name="ImportControl">
-      <property name="file" value="${basedir}/checkstyle/import-control.xml"/>
-    </module>
-    
     <!-- whitespace -->
     <module name="GenericWhitespace"/>
     <module name="NoWhitespaceBefore"/>
@@ -85,4 +78,4 @@
     <module name="ParenPad"/>
     <module name="TypecastParenPad"/>
   </module>
-</module>
\ No newline at end of file
+</module>

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 16e1f5d..d8dfe7b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,8 +18,9 @@ group=org.apache.samza
 version=0.10.1-SNAPSHOT
 scalaVersion=2.10
 
-gradleVersion=2.0
+gradleVersion=2.8
 
 org.gradle.jvmargs="-XX:MaxPermSize=512m"
 
 systemProp.file.encoding=utf-8
+checkstyleVersion=6.15

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index a7634b0..deedc7f 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 78596c0..55720c3 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Thu Jul 03 20:51:36 PDT 2014
+#Mon Oct 31 23:13:44 PDT 2016
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=http\://services.gradle.org/distributions/gradle-2.0-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.8-bin.zip

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradlew
----------------------------------------------------------------------
diff --git a/gradlew b/gradlew
index 91a7e26..9aa616c 100755
--- a/gradlew
+++ b/gradlew
@@ -6,12 +6,30 @@
 ##
 ##############################################################################
 
-# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-DEFAULT_JVM_OPTS=""
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+        PRG="$link"
+    else
+        PRG=`dirname "$PRG"`"/$link"
+    fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >/dev/null
+APP_HOME="`pwd -P`"
+cd "$SAVED" >/dev/null
 
 APP_NAME="Gradle"
 APP_BASE_NAME=`basename "$0"`
 
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
 # Use the maximum available, or set MAX_FD != -1 to use that value.
 MAX_FD="maximum"
 
@@ -30,6 +48,7 @@ die ( ) {
 cygwin=false
 msys=false
 darwin=false
+nonstop=false
 case "`uname`" in
   CYGWIN* )
     cygwin=true
@@ -40,31 +59,11 @@ case "`uname`" in
   MINGW* )
     msys=true
     ;;
+  NONSTOP* )
+    nonstop=true
+    ;;
 esac
 
-# For Cygwin, ensure paths are in UNIX format before anything is touched.
-if $cygwin ; then
-    [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-fi
-
-# Attempt to set APP_HOME
-# Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
-    ls=`ls -ld "$PRG"`
-    link=`expr "$ls" : '.*-> \(.*\)$'`
-    if expr "$link" : '/.*' > /dev/null; then
-        PRG="$link"
-    else
-        PRG=`dirname "$PRG"`"/$link"
-    fi
-done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >&-
-APP_HOME="`pwd -P`"
-cd "$SAVED" >&-
-
 CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
 
 # Determine the Java command to use to start the JVM.
@@ -90,7 +89,7 @@ location of your Java installation."
 fi
 
 # Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
     MAX_FD_LIMIT=`ulimit -H -n`
     if [ $? -eq 0 ] ; then
         if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
@@ -114,6 +113,7 @@ fi
 if $cygwin ; then
     APP_HOME=`cygpath --path --mixed "$APP_HOME"`
     CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+    JAVACMD=`cygpath --unix "$JAVACMD"`
 
     # We build the pattern for arguments to be converted via cygpath
     ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
@@ -161,4 +161,9 @@ function splitJvmOpts() {
 eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
 JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
 
+# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
+if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
+  cd "$(dirname "$0")"
+fi
+
 exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
new file mode 100644
index 0000000..4c632b8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators;
+import org.apache.samza.operators.internal.Operators.Operator;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to
+ * directly program the stream processing stages that processes a stream and generate another one.
+ *
+ * @param <M>  Type of message in this stream
+ */
+public class MessageStream<M extends Message> {
+
+  private final Set<Operator> subscribers = new HashSet<>();
+
+  /**
+   * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}.
+   *
+   * NOTE: This is purely an internal API and should not be used directly by programmers.
+   *
+   * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object
+   */
+  public Collection<Operator> getSubscribers() {
+    return Collections.unmodifiableSet(this.subscribers);
+  }
+
+  /**
+   * Public API methods start here
+   */
+
+  /**
+   * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value
+   *
+   * @param <A>  the type of input {@code a}
+   * @param <B>  the type of input {@code b}
+   * @param <C>  the type of input {@code c}
+   */
+  @FunctionalInterface
+  public interface VoidFunction3<A, B, C> {
+    public void apply(A a, B b, C c);
+  }
+
+  /**
+   * Method to apply a map function (1:1) on a {@link MessageStream}
+   *
+   * @param mapper  the mapper function to map one input {@link Message} to one output {@link Message}
+   * @param <OM>  the type of the output {@link Message} in the output {@link MessageStream}
+   * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
+   */
+  public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
+    Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() { {
+        OM r = mapper.apply(m);
+        if (r != null) {
+          this.add(r);
+        }
+      } });
+    this.subscribers.add(op);
+    return op.getOutputStream();
+  }
+
+  /**
+   * Method to apply a flatMap function (1:n) on a {@link MessageStream}
+   *
+   * @param flatMapper  the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s
+   * @param <OM>  the type of the output {@link Message} in the output {@link MessageStream}
+   * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
+   */
+  public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
+    Operator<OM> op = Operators.getStreamOperator(flatMapper);
+    this.subscribers.add(op);
+    return op.getOutputStream();
+  }
+
+  /**
+   * Method to apply a filter function on a {@link MessageStream}
+   *
+   * @param filter  the filter function to filter input {@link Message}s from the input {@link MessageStream}
+   * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
+   */
+  public MessageStream<M> filter(Function<M, Boolean> filter) {
+    Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() { {
+        if (filter.apply(t)) {
+          this.add(t);
+        }
+      } });
+    this.subscribers.add(op);
+    return op.getOutputStream();
+  }
+
+  /**
+   * Method to send an input {@link MessageStream} to an output {@link org.apache.samza.system.SystemStream}, and allows the output {@link MessageStream}
+   * to be consumed by downstream stream operators again.
+   *
+   * @param sink  the user-defined sink function to send the input {@link Message}s to the external output systems
+   */
+  public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
+    this.subscribers.add(Operators.getSinkOperator(sink));
+  }
+
+  /**
+   * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream}
+   *
+   * @param window  the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream}
+   * @param <WK>  the type of key in the output {@link Message} from the {@link Windows.Window} function
+   * @param <WV>  the type of output value from
+   * @param <WS>  the type of window state kept in the {@link Windows.Window} function
+   * @param <WM>  the type of {@link org.apache.samza.operators.internal.WindowOutput} message from the {@link Windows.Window} function
+   * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
+   */
+  public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Windows.Window<M, WK, WV, WM> window) {
+    Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window));
+    this.subscribers.add(wndOp);
+    return wndOp.getOutputStream();
+  }
+
+  /**
+   * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins.
+   *
+   * @param other  the other stream to be joined w/
+   * @param merger  the common function to merge messages from this {@link MessageStream} and {@code other}
+   * @param <K>  the type of join key
+   * @param <JM>  the type of message in the {@link Message} from the other join stream
+   * @param <RM>  the type of message in the {@link Message} from the join function
+   * @return the output {@link MessageStream} from the join function {@code joiner}
+   */
+  public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other,
+      BiFunction<M, JM, RM> merger) {
+    MessageStream<RM> outputStream = new MessageStream<>();
+
+    BiFunction<M, JM, RM> parJoin1 = merger::apply;
+    BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m);
+
+    // TODO: need to add default store functions for the two partial join functions
+
+    other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream));
+    this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream));
+    return outputStream;
+  }
+
+  /**
+   * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M}
+   *
+   * @param others  other streams to be merged w/ this one
+   * @return  the merged output stream
+   */
+  public MessageStream<M> merge(Collection<MessageStream<M>> others) {
+    MessageStream<M> outputStream = new MessageStream<>();
+
+    others.add(this);
+    others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream)));
+    return outputStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java
new file mode 100644
index 0000000..4a0ae6a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream}
+ * from a specific input source.
+ *
+ */
+
+public final class MessageStreams {
+
+  /**
+   * private constructor to prevent instantiation
+   */
+  private MessageStreams() {}
+
+  /**
+   * private class for system input/output {@link MessageStream}
+   */
+  public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> {
+    /**
+     * The corresponding {@link org.apache.samza.system.SystemStream}
+     */
+    private final SystemStreamPartition ssp;
+
+    /**
+     * Constructor for input system stream
+     *
+     * @param ssp  the input {@link SystemStreamPartition} for the input {@link SystemMessageStream}
+     */
+    private SystemMessageStream(SystemStreamPartition ssp) {
+      this.ssp = ssp;
+    }
+
+    /**
+     * Getter for the {@link SystemStreamPartition} of the input
+     *
+     * @return the input {@link SystemStreamPartition}
+     */
+    public SystemStreamPartition getSystemStreamPartition() {
+      return this.ssp;
+    }
+  }
+
+  /**
+   * Public static API methods start here
+   */
+
+  /**
+   * Static API method to create a {@link MessageStream} from a system input stream
+   *
+   * @param ssp  the input {@link SystemStreamPartition}
+   * @return the {@link MessageStream} object takes {@code ssp} as the input
+   */
+  public static SystemMessageStream input(SystemStreamPartition ssp) {
+    return new SystemMessageStream(ssp);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java
new file mode 100644
index 0000000..5b7db9c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Trigger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines a builder of {@link org.apache.samza.operators.internal.Trigger} object for a {@link Windows.Window}. The triggers are categorized into
+ * three types:
+ *
+ * <p>
+ *   early trigger: defines the condition when the first output from the window function is sent.
+ *   late trigger: defines the condition when the updated output after the first output is sent.
+ *   timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers
+ * </p>
+ *
+ * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR).
+ *
+ * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.internal} to create triggers
+ *
+ *
+ * @param <M>  the type of input {@link Message} to the {@link Windows.Window}
+ * @param <V>  the type of output value from the {@link Windows.Window}
+ */
+public final class TriggerBuilder<M extends Message, V> {
+
+  /**
+   * Predicate helper to OR multiple trigger conditions
+   */
+  static class PredicateHelper {
+    static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) {
+      return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
+    }
+
+    static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) {
+      return s -> lhs.apply(s) || rhs.apply(s);
+    }
+  }
+
+  /**
+   * The early trigger condition that determines the first output from the {@link Windows.Window}
+   */
+  private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
+
+  /**
+   * The late trigger condition that determines the late output(s) from the {@link Windows.Window}
+   */
+  private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
+
+  /**
+   * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward
+   */
+  private Function<WindowState<V>, Boolean> timerTrigger = null;
+
+  /**
+   * The state updater function to be applied after the first output is triggered
+   */
+  private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity();
+
+  /**
+   * The state updater function to be applied after the late output is triggered
+   */
+  private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity();
+
+  /**
+   * Helper method to add a trigger condition
+   *
+   * @param currentTrigger  current trigger condition
+   * @param newTrigger  new trigger condition
+   * @return  combined trigger condition that is {@code currentTrigger} OR {@code newTrigger}
+   */
+  private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger,
+      BiFunction<M, WindowState<V>, Boolean> newTrigger) {
+    if (currentTrigger == null) {
+      return newTrigger;
+    }
+
+    return PredicateHelper.or(currentTrigger, newTrigger);
+  }
+
+  /**
+   * Helper method to add a system timer trigger
+   *
+   * @param currentTimer  current timer condition
+   * @param newTimer  new timer condition
+   * @return  combined timer condition that is {@code currentTimer} OR {@code newTimer}
+   */
+  private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
+      Function<WindowState<V>, Boolean> newTimer) {
+    if (currentTimer == null) {
+      return newTimer;
+    }
+
+    return PredicateHelper.or(currentTimer, newTimer);
+  }
+
+  /**
+   * default constructor to prevent instantiation
+   */
+  private TriggerBuilder() {}
+
+  /**
+   * Constructor that set the size limit as the early trigger for a window
+   *
+   * @param sizeLimit  the number of messages in a window that would trigger the first output
+   */
+  private TriggerBuilder(long sizeLimit) {
+    this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
+  }
+
+  /**
+   * Constructor that set the event time length as the early trigger
+   *
+   * @param eventTimeFunction  the function that calculate the event time in nano-second from the input {@link Message}
+   * @param wndLenMs  the window length in event time in milli-second
+   */
+  private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
+    this.earlyTrigger = (m, s) ->
+        TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(),
+            eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs;
+  }
+
+  /**
+   * Constructor that set the special token message as the early trigger
+   *
+   * @param tokenFunc  the function that checks whether an input {@link Message} is a token message that triggers window output
+   */
+  private TriggerBuilder(Function<M, Boolean> tokenFunc) {
+    this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
+  }
+
+  /**
+   * Build method that creates an {@link org.apache.samza.operators.internal.Trigger} object based on the trigger conditions set in {@link TriggerBuilder}
+   * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
+   *
+   * @return  the final {@link org.apache.samza.operators.internal.Trigger} object
+   */
+  Trigger<M, WindowState<V>> build() {
+    return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
+  }
+
+  /**
+   * Public API methods start here
+   */
+
+
+  /**
+   * API method to allow users to set an update method to update the output value after the first window output is triggered
+   * by the early trigger condition
+   *
+   * @param onTriggerFunc  the method to update the output value after the early trigger
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
+    this.earlyTriggerUpdater = s -> {
+      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
+      return s;
+    };
+    return this;
+  }
+
+  /**
+   * API method to allow users to set an update method to update the output value after a late window output is triggered
+   * by the late trigger condition
+   *
+   * @param onTriggerFunc  the method to update the output value after the late trigger
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
+    this.lateTriggerUpdater = s -> {
+      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
+      return s;
+    };
+    return this;
+  }
+
+  /**
+   * API method to allow users to add a system timer trigger based on timeout after the last message received in the window
+   *
+   * @param timeoutMs  the timeout in ms after the last message received in the window
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
+    this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
+        s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
+    return this;
+  }
+
+  /**
+   * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window
+   *
+   * @param timeoutMs  the timeout in ms after the first message received in the window
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
+    this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
+        TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
+    return this;
+  }
+
+  /**
+   * API method allow users to add a late trigger based on the window size limit
+   *
+   * @param sizeLimit  limit on the number of messages in window
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
+    this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit);
+    return this;
+  }
+
+  /**
+   * API method to allow users to define a customized late trigger function based on input message and the window state
+   *
+   * @param lateTrigger  the late trigger condition based on input {@link Message} and the current {@link WindowState}
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) {
+    this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
+    return this;
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit
+   *
+   * @param sizeLimit  window size limit
+   * @param <M>  the type of input {@link Message}
+   * @param <V>  the type of {@link Windows.Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) {
+    return new TriggerBuilder<M, V>(sizeLimit);
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window
+   *
+   *
+   * @param eventTimeFunc  the function to get the event time from the input message
+   * @param eventTimeWndSizeMs  the event time window size in Ms
+   * @param <M>  the type of input {@link Message}
+   * @param <V>  the type of {@link Windows.Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) {
+    return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages
+   *
+   * @param tokenFunc  the function to determine whether an input message is a window token or not
+   * @param <M>  the type of input {@link Message}
+   * @param <V>  the type of {@link Windows.Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
+    return new TriggerBuilder<M, V>(tokenFunc);
+  }
+
+  /**
+   * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState}
+   *
+   * @param earlyTrigger  the user defined early trigger condition
+   * @param <M>   the input message type
+   * @param <V>   the output value from the window
+   * @return   the {@link TriggerBuilder} object
+   */
+  public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
+    TriggerBuilder<M, V> newTriggers =  new TriggerBuilder<M, V>();
+    newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
+    return newTriggers;
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window
+   *
+   * @param timeoutMs  timeout in ms after the last message received
+   * @param <M>  the type of input {@link Message}
+   * @param <V>  the type of {@link Windows.Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) {
+    return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window
+   *
+   * @param timeoutMs  timeout in ms after the first message received
+   * @param <M>  the type of input {@link Message}
+   * @param <V>  the type of {@link Windows.Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) {
+    return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/WindowState.java b/samza-api/src/main/java/org/apache/samza/operators/WindowState.java
new file mode 100644
index 0000000..8f98d38
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/WindowState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+/**
+ * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
+ * customized window state to be stored in window state stores by implementing this interface class.
+ *
+ * @param <WV>  the type for window output value
+ */
+public interface WindowState<WV> {
+  /**
+   * Method to get the system time when the first message in the window is received
+   *
+   * @return  nano-second of system time for the first message received in the window
+   */
+  long getFirstMessageTimeNs();
+
+  /**
+   * Method to get the system time when the last message in the window is received
+   *
+   * @return  nano-second of system time for the last message received in the window
+   */
+  long getLastMessageTimeNs();
+
+  /**
+   * Method to get the earliest event time in the window
+   *
+   * @return  the earliest event time in nano-second in the window
+   */
+  long getEarliestEventTimeNs();
+
+  /**
+   * Method to get the latest event time in the window
+   *
+   * @return  the latest event time in nano-second in the window
+   */
+  long getLatestEventTimeNs();
+
+  /**
+   * Method to get the total number of messages received in the window
+   *
+   * @return  number of messages in the window
+   */
+  long getNumberMessages();
+
+  /**
+   * Method to get the corresponding window's output value
+   *
+   * @return  the corresponding window's output value
+   */
+  WV getOutputValue();
+
+  /**
+   * Method to set the corresponding window's output value
+   *
+   * @param value  the corresponding window's output value
+   */
+  void setOutputValue(WV value);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/Windows.java
new file mode 100644
index 0000000..5ffa211
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/Windows.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators;
+import org.apache.samza.operators.internal.Trigger;
+import org.apache.samza.operators.internal.WindowFn;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be
+ * used by the user (i.e. programmers) to create {@link Window} function directly.
+ *
+ */
+public final class Windows {
+
+  /**
+   * private constructor to prevent instantiation
+   */
+  private Windows() {}
+
+  /**
+   * This class defines a session window function class
+   *
+   * @param <M>  the type of input {@link Message}
+   * @param <WK>  the type of session key in the session window
+   * @param <WV>  the type of output value in each session window
+   */
+  static class SessionWindow<M extends Message, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> {
+
+    /**
+     * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows}
+     *
+     * @param sessionKeyFunction  function to get the session key from the input {@link Message}
+     * @param aggregator  function to calculate the output value based on the input {@link Message} and current output value
+     */
+    private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) {
+      this.wndKeyFunction = sessionKeyFunction;
+      this.aggregator = aggregator;
+    }
+
+    /**
+     * function to calculate the window key from input message
+     */
+    private final Function<M, WK> wndKeyFunction;
+
+    /**
+     * function to calculate the output value from the input message and the current output value
+     */
+    private final BiFunction<M, WV, WV> aggregator;
+
+    /**
+     * trigger condition that determines when to send out the output value in a {@link WindowOutput} message
+     */
+    private Trigger<M, WindowState<WV>> trigger = null;
+
+    //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link Message} type for {@link Window}
+    private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null;
+
+    /**
+     * Public API methods start here
+     */
+
+    /**
+     * Public API method to define the watermark trigger for the window operator
+     *
+     * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow}
+     * @return The window operator w/ the defined watermark trigger
+     */
+    @Override
+    public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) {
+      this.trigger = wndTrigger.build();
+      return this;
+    }
+
+    private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
+      // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers;
+      return null;
+    }
+
+    private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() {
+      return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
+
+        @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
+          return SessionWindow.this.getTransformFunc();
+        }
+
+        @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> getStoreFuncs() {
+          return SessionWindow.this.storeFunctions;
+        }
+
+        @Override public Trigger<M, WindowState<WV>> getTrigger() {
+          return SessionWindow.this.trigger;
+        }
+      };
+    }
+  }
+
+  static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
+      Window<M, WK, WV, WM> window) {
+    if (window instanceof SessionWindow) {
+      SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window;
+      return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
+    }
+    throw new IllegalArgumentException("Input window type not supported.");
+  }
+
+  /**
+   * Public static API methods start here
+   *
+   */
+
+  /**
+   * The public programming interface class for window function
+   *
+   * @param <M>  the type of input {@link Message}
+   * @param <WK>  the type of key to the {@link Window}
+   * @param <WV>  the type of output value in the {@link WindowOutput}
+   * @param <WM>  the type of message in the window output stream
+   */
+  public interface Window<M extends Message, WK, WV, WM extends WindowOutput<WK, WV>> {
+
+    /**
+     * Set the triggers for this {@link Window}
+     *
+     * @param wndTrigger  trigger conditions set by the programmers
+     * @return  the {@link Window} function w/ the trigger {@code wndTrigger}
+     */
+    Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
+  }
+
+  /**
+   * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input messages
+   *
+   * @param sessionKeyFunction  function to calculate session window key
+   * @param <M>  type of input {@link Message}
+   * @param <WK>  type of the session window key
+   * @return  the {@link Window} function for the session
+   */
+  public static <M extends Message, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) {
+    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
+      c.add(m);
+      return c;
+    });
+  }
+
+  /**
+   * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input messages
+   *
+   * @param sessionKeyFunction  function to calculate session window key
+   * @param sessionInfoExtractor  function to retrieve session info of type {@code SI} from the input message of type {@code M}
+   * @param <M>  type of the input {@link Message}
+   * @param <WK>  type of the session window key
+   * @param <SI>  type of the session information retrieved from each input message of type {@code M}
+   * @return  the {@link Window} function for the session
+   */
+  public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction,
+      Function<M, SI> sessionInfoExtractor) {
+    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
+      c.add(sessionInfoExtractor.apply(m));
+      return c;
+    });
+  }
+
+  /**
+   * Static API method to create a {@link SessionWindow} as a counter of input messages
+   *
+   * @param sessionKeyFunction  function to calculate session window key
+   * @param <M>  type of the input {@link Message}
+   * @param <WK>  type of the session window key
+   * @return  the {@link Window} function for the session
+   */
+  public static <M extends Message, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) {
+    return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
new file mode 100644
index 0000000..3c9874d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This class implements a {@link Message} that encapsulates an {@link IncomingMessageEnvelope} from the system
+ *
+ */
+public class IncomingSystemMessage implements Message<Object, Object>, InputSystemMessage<Offset> {
+  /**
+   * Incoming message envelope
+   */
+  private final IncomingMessageEnvelope imsg;
+
+  /**
+   * The receive time of this incoming message
+   */
+  private final long recvTimeNano;
+
+  /**
+   * Ctor to create a {@code IncomingSystemMessage} from {@link IncomingMessageEnvelope}
+   *
+   * @param imsg The incoming system message
+   */
+  public IncomingSystemMessage(IncomingMessageEnvelope imsg) {
+    this.imsg = imsg;
+    this.recvTimeNano = System.nanoTime();
+  }
+
+  @Override
+  public Object getMessage() {
+    return this.imsg.getMessage();
+  }
+
+  @Override
+  public Object getKey() {
+    return this.imsg.getKey();
+  }
+
+  @Override
+  public long getTimestamp() {
+    return this.recvTimeNano;
+  }
+
+  @Override
+  public Offset getOffset() {
+    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+    // assuming incoming message carries long value as offset (i.e. Kafka case)
+    return new LongOffset(this.imsg.getOffset());
+  }
+
+  @Override
+  public SystemStreamPartition getSystemStreamPartition() {
+    return imsg.getSystemStreamPartition();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
new file mode 100644
index 0000000..509f640
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This interface defines additional methods a message from an system input should implement, including the methods to
+ * get {@link SystemStreamPartition} and the {@link Offset} of the input system message.
+ */
+public interface InputSystemMessage<O extends Offset> {
+
+  /**
+   * Get the input message's {@link SystemStreamPartition}
+   *
+   * @return  the {@link SystemStreamPartition} this message is coming from
+   */
+  SystemStreamPartition getSystemStreamPartition();
+
+  /**
+   * Get the offset of the message in the input stream. This should be used to uniquely identify a message in an input stream.
+   *
+   * @return The offset of the message in the input stream.
+   */
+  O getOffset();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java b/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
new file mode 100644
index 0000000..0b6c0fa
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.data;
+
+/**
+ * An implementation of {@link org.apache.samza.operators.data.Offset}, w/ {@code long} value as the offset
+ */
+public class LongOffset implements Offset {
+
+  /**
+   * The offset value in {@code long}
+   */
+  private final Long offset;
+
+  private LongOffset(long offset) {
+    this.offset = offset;
+  }
+
+  public LongOffset(String offset) {
+    this.offset = Long.valueOf(offset);
+  }
+
+  @Override
+  public int compareTo(Offset o) {
+    if (!(o instanceof LongOffset)) {
+      throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName());
+    }
+    LongOffset other = (LongOffset) o;
+    return this.offset.compareTo(other.offset);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof LongOffset)) {
+      return false;
+    }
+    LongOffset o = (LongOffset) other;
+    return this.offset.equals(o.offset);
+  }
+
+  @Override
+  public int hashCode() {
+    return offset.hashCode();
+  }
+
+  /**
+   * Helper method to get the minimum offset
+   *
+   * @return The minimum offset
+   */
+  public static LongOffset getMinOffset() {
+    return new LongOffset(Long.MIN_VALUE);
+  }
+
+  /**
+   * Helper method to get the maximum offset
+   *
+   * @return The maximum offset
+   */
+  public static LongOffset getMaxOffset() {
+    return new LongOffset(Long.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/Message.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Message.java b/samza-api/src/main/java/org/apache/samza/operators/data/Message.java
new file mode 100644
index 0000000..aaafd94
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/Message.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.data;
+
+/**
+ * This class defines the generic interface of {@link Message}, which is a entry in the input/output stream.
+ *
+ * <p>The {@link Message} models the basic operatible unit in streaming SQL processes in Samza.
+ *
+ */
+public interface Message<K, M> {
+
+  /**
+   * Access method to get the corresponding message body in {@link Message}
+   *
+   * @return Message object in this {@link Message}
+   */
+  M getMessage();
+
+  /**
+   * Method to indicate whether this {@link Message} indicates deletion of a message w/ the message key
+   *
+   * @return A boolean value indicates whether the current message is a delete or insert message
+   */
+  default boolean isDelete() {
+    return false;
+  };
+
+  /**
+   * Access method to the key of the message
+   *
+   * @return The key of the message
+   */
+  K getKey();
+
+  /**
+   * Get the message creation timestamp of the message.
+   *
+   * @return The message's timestamp in nano seconds.
+   */
+  long getTimestamp();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java b/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
new file mode 100644
index 0000000..33eb9ba
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.data;
+
+/**
+ * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream
+ */
+public interface Offset extends Comparable<Offset> {
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java
new file mode 100644
index 0000000..f06387c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines all basic stream operator classes used by internal implementation only. All classes defined in
+ * this file are immutable.
+ *
+ * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects
+ * should be initiated via {@link MessageStream} API methods
+ */
+public class Operators {
+  /**
+   * Private constructor to prevent instantiation of the {@link Operators} class
+   */
+  private Operators() {}
+
+  private static String getOperatorId() {
+    // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Private interface for stream operator functions. The interface class defines the output of the stream operator function.
+   *
+   */
+  public interface Operator<OM extends Message> {
+    MessageStream<OM> getOutputStream();
+  }
+
+  /**
+   * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s.
+   *
+   * @param <M>  the type of input {@link Message}
+   * @param <OM>  the type of output {@link Message}
+   */
+  public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> {
+    /**
+     * The output {@link MessageStream}
+     */
+    private final MessageStream<OM> outputStream;
+
+    /**
+     * The transformation function
+     */
+    private final Function<M, Collection<OM>> txfmFunction;
+
+    /**
+     * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}.
+     *
+     * @param transformFn  the transformation function to be applied that transforms 1 input {@link Message} into a collection
+     *                     of output {@link Message}s
+     */
+    private StreamOperator(Function<M, Collection<OM>> transformFn) {
+      this(transformFn, new MessageStream<>());
+    }
+
+    /**
+     * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream}
+     *
+     * @param transformFn  the transformation function
+     * @param outputStream  the output {@link MessageStream}
+     */
+    private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) {
+      this.outputStream = outputStream;
+      this.txfmFunction = transformFn;
+    }
+
+    @Override
+    public MessageStream<OM> getOutputStream() {
+      return this.outputStream;
+    }
+
+    /**
+     * Method to get the transformation function.
+     *
+     * @return the {@code txfmFunction}
+     */
+    public Function<M, Collection<OM>> getFunction() {
+      return this.txfmFunction;
+    }
+
+  }
+
+  /**
+   * A sink operator function that allows customized code to send the output to external system. This is the terminal
+   * operator that does not have any output {@link MessageStream} that allows further processing in the same
+   * {@link org.apache.samza.operators.task.StreamOperatorTask}
+   *
+   * @param <M>  the type of input {@link Message}
+   */
+  public static class SinkOperator<M extends Message> implements Operator {
+
+    /**
+     * The user-defined sink function
+     */
+    private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink;
+
+    /**
+     * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}.
+     *
+     * @param sink  the user-defined sink function
+     */
+    private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
+      this.sink = sink;
+    }
+
+    @Override
+    public MessageStream getOutputStream() {
+      return null;
+    }
+
+    /**
+     * Method to get the user-defined function implements the {@link SinkOperator}
+     *
+     * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector}
+     *         and {@link TaskCoordinator} to the sink function
+     */
+    public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() {
+      return this.sink;
+    }
+  }
+
+  /**
+   * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve
+   * buffered messages and partial aggregation results
+   *
+   * @param <SK>  the type of key used to store the operator states
+   * @param <SS>  the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered
+   *             input message from the join stream for a join
+   */
+  public static class StoreFunctions<M extends Message, SK, SS> {
+    /**
+     * Function to define the key to query in the operator state store, according to the incoming {@link Message}
+     * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping
+     * windows and unique-key-based join.
+     *
+     * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query
+     * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input
+     * message to a range of keys in the store.
+     */
+    private final Function<M, SK> storeKeyFinder;
+
+    /**
+     * Function to update the store entry based on the current state and the incoming {@link Message}
+     *
+     * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping
+     * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the
+     * state value.
+     */
+    private final BiFunction<M, SS, SS> stateUpdater;
+
+    /**
+     * Constructor of state store functions.
+     *
+     */
+    private StoreFunctions(Function<M, SK> keyFinder,
+        BiFunction<M, SS, SS> stateUpdater) {
+      this.storeKeyFinder = keyFinder;
+      this.stateUpdater = stateUpdater;
+    }
+
+    /**
+     * Method to get the {@code storeKeyFinder} function
+     *
+     * @return  the function to calculate the key from an input {@link Message}
+     */
+    public Function<M, SK> getStoreKeyFinder() {
+      return this.storeKeyFinder;
+    }
+
+    /**
+     * Method to get the {@code stateUpdater} function
+     *
+     * @return  the function to update the corresponding state according to an input {@link Message}
+     */
+    public BiFunction<M, SS, SS> getStateUpdater() {
+      return this.stateUpdater;
+    }
+  }
+
+  /**
+   * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate
+   * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput}
+   *
+   * @param <M>  the type of input {@link Message}
+   * @param <WK>  the type of key in the output {@link Message} from the {@link WindowOperator} function
+   * @param <WS>  the type of window state in the {@link WindowOperator} function
+   * @param <WM>  the type of window output {@link Message}
+   */
+  public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> {
+    /**
+     * The output {@link MessageStream}
+     */
+    private final MessageStream<WM> outputStream;
+
+    /**
+     * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window
+     * state(s) from the window state store, and generate output {@link Message}s to the output stream.
+     */
+    private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction;
+
+    /**
+     * The state store functions for the {@link WindowOperator}
+     */
+    private final StoreFunctions<M, WK, WS> storeFunctions;
+
+    /**
+     * The window trigger function
+     */
+    private final Trigger<M, WS> trigger;
+
+    /**
+     * The unique ID of stateful operators
+     */
+    private final String opId;
+
+    /**
+     * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}.
+     *
+     * @param windowFn  description of the window function
+     * @param operatorId  auto-generated unique ID of the operator
+     */
+    private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) {
+      this.outputStream = new MessageStream<>();
+      this.txfmFunction = windowFn.getTransformFunc();
+      this.storeFunctions = windowFn.getStoreFuncs();
+      this.trigger = windowFn.getTrigger();
+      this.opId = operatorId;
+    }
+
+    @Override
+    public String toString() {
+      return this.opId;
+    }
+
+    @Override
+    public MessageStream<WM> getOutputStream() {
+      return this.outputStream;
+    }
+
+    /**
+     * Method to get the window's {@link StoreFunctions}.
+     *
+     * @return  the window operator's {@code storeFunctions}
+     */
+    public StoreFunctions<M, WK, WS> getStoreFunctions() {
+      return this.storeFunctions;
+    }
+
+    /**
+     * Method to get the window operator's main function
+     *
+     * @return   the window operator's {@code txfmFunction}
+     */
+    public BiFunction<M, Entry<WK, WS>, WM> getFunction() {
+      return this.txfmFunction;
+    }
+
+    /**
+     * Method to get the trigger functions
+     *
+     * @return  the {@link Trigger} for this {@link WindowOperator}
+     */
+    public Trigger<M, WS> getTrigger() {
+      return this.trigger;
+    }
+
+    /**
+     * Method to generate the window operator's state store name
+     *
+     * @param inputStream the input {@link MessageStream} to this state store
+     * @return   the persistent store name of the window operator
+     */
+    public String getStoreName(MessageStream<M> inputStream) {
+      //TODO: need to get the persistent name of ds and the operator in a serialized form
+      return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString());
+    }
+  }
+
+  /**
+   * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from
+   * another stream and generate join output to {@code output}
+   *
+   * @param <M>  the type of input {@link Message}
+   * @param <K>  the type of join key
+   * @param <JM>  the type of message of {@link Message} in the other join stream
+   * @param <RM>  the type of message of {@link Message} in the join output stream
+   */
+  public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> {
+
+    private final MessageStream<RM> joinOutput;
+
+    /**
+     * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message,
+     * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}.
+     */
+    private final BiFunction<M, JM, RM> txfmFunction;
+
+    /**
+     * The message store functions that read the buffered messages from the other stream in the join
+     */
+    private final StoreFunctions<JM, K, JM> joinStoreFunctions;
+
+    /**
+     * The message store functions that save the buffered messages of this {@link MessageStream} in the join
+     */
+    private final StoreFunctions<M, K, M> selfStoreFunctions;
+
+    /**
+     * The unique ID for the stateful operator
+     */
+    private final String opId;
+
+    /**
+     * Default constructor to create a {@link PartialJoinOperator} object
+     *
+     * @param partialJoin  partial join function that take type {@code M} of input {@link Message} and join w/ type
+     *                     {@code JM} of buffered {@link Message} from another stream
+     * @param joinOutput  the output {@link MessageStream} of the join results
+     */
+    private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) {
+      this.joinOutput = joinOutput;
+      this.txfmFunction = partialJoin;
+      // Read-only join store, no creator/updater functions specified
+      this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null);
+      // Buffered message store for this input stream
+      this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m);
+      this.opId = opId;
+    }
+
+    @Override
+    public String toString() {
+      return this.opId;
+    }
+
+    @Override
+    public MessageStream<RM> getOutputStream() {
+      return this.joinOutput;
+    }
+
+    /**
+     * Method to get {@code joinStoreFunctions}
+     *
+     * @return  {@code joinStoreFunctions}
+     */
+    public StoreFunctions<JM, K, JM> getJoinStoreFunctions() {
+      return this.joinStoreFunctions;
+    }
+
+    /**
+     * Method to get {@code selfStoreFunctions}
+     *
+     * @return  {@code selfStoreFunctions}
+     */
+    public StoreFunctions<M, K, M> getSelfStoreFunctions() {
+      return this.selfStoreFunctions;
+    }
+
+    /**
+     * Method to get {@code txfmFunction}
+     *
+     * @return  {@code txfmFunction}
+     */
+    public BiFunction<M, JM, RM> getFunction() {
+      return this.txfmFunction;
+    }
+  }
+
+  /**
+   * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator}
+   *
+   * @param transformFn  the corresponding transformation function
+   * @param <M>  type of input {@link Message}
+   * @param <OM>  type of output {@link Message}
+   * @return  the {@link StreamOperator}
+   */
+  public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) {
+    return new StreamOperator<>(transformFn);
+  }
+
+  /**
+   * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator}
+   *
+   * @param sinkFn  the sink function
+   * @param <M>  type of input {@link Message}
+   * @return   the {@link SinkOperator}
+   */
+  public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) {
+    return new SinkOperator<>(sinkFn);
+  }
+
+  /**
+   * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
+   *
+   * @param windowFn  the {@link WindowFn} function
+   * @param <M>  type of input {@link Message}
+   * @param <WK>  type of window key
+   * @param <WS>  type of {@link WindowState}
+   * @param <WM>  type of output {@link WindowOutput}
+   * @return  the {@link WindowOperator}
+   */
+  public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator(
+      WindowFn<M, WK, WS, WM> windowFn) {
+    return new WindowOperator<>(windowFn, Operators.getOperatorId());
+  }
+
+  /**
+   * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
+   *
+   * @param joiner  the {@link WindowFn} function
+   * @param joinOutput  the output {@link MessageStream}
+   * @param <M>  type of input {@link Message}
+   * @param <K>  type of join key
+   * @param <JM>  the type of message in the {@link Message} from the other join stream
+   * @param <RM>  the type of message in the {@link Message} from the join function
+   * @return  the {@link PartialJoinOperator}
+   */
+  public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator(
+      BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) {
+    return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId());
+  }
+
+  /**
+   * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function
+   *
+   * @param mergeOutput  the common output {@link MessageStream} from the merger
+   * @param <M>  the type of input {@link Message}
+   * @return  the {@link StreamOperator} for merge
+   */
+  public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) {
+    return new StreamOperator<M, M>(t ->
+      new ArrayList<M>() { {
+          this.add(t);
+        } },
+      mergeOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java
new file mode 100644
index 0000000..3b50e2b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable.
+ *
+ * @param <M>  the type of message from the input stream
+ * @param <S>  the type of state variable in the window's state store
+ */
+public class Trigger<M extends Message, S extends WindowState> {
+
+  /**
+   * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward
+   */
+  private final Function<S, Boolean> timerTrigger;
+
+  /**
+   * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator}
+   */
+  private final BiFunction<M, S, Boolean> earlyTrigger;
+
+  /**
+   * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator}
+   */
+  private final BiFunction<M, S, Boolean> lateTrigger;
+
+  /**
+   * the function to updated the window state when the first output is triggered
+   */
+  private final Function<S, S> earlyTriggerUpdater;
+
+  /**
+   * the function to updated the window state when the late output is triggered
+   */
+  private final Function<S, S> lateTriggerUpdater;
+
+  /**
+   * Private constructor to prevent instantiation
+   *
+   * @param timerTrigger  system timer trigger condition
+   * @param earlyTrigger  early trigger condition
+   * @param lateTrigger   late trigger condition
+   * @param earlyTriggerUpdater  early trigger state updater
+   * @param lateTriggerUpdater   late trigger state updater
+   */
+  private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
+      Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
+    this.timerTrigger = timerTrigger;
+    this.earlyTrigger = earlyTrigger;
+    this.lateTrigger = lateTrigger;
+    this.earlyTriggerUpdater = earlyTriggerUpdater;
+    this.lateTriggerUpdater = lateTriggerUpdater;
+  }
+
+  /**
+   * Static method to create a {@link Trigger} object
+   *
+   * @param timerTrigger  system timer trigger condition
+   * @param earlyTrigger  early trigger condition
+   * @param lateTrigger  late trigger condition
+   * @param earlyTriggerUpdater  early trigger state updater
+   * @param lateTriggerUpdater  late trigger state updater
+   * @param <M>  the type of input {@link Message}
+   * @param <S>  the type of window state extends {@link WindowState}
+   * @return  the {@link Trigger} function
+   */
+  public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger,
+      BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater,
+      Function<S, S> lateTriggerUpdater) {
+    return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
new file mode 100644
index 0000000..489e5b8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used
+ * by the internal representation and implementation classes in operators.
+ *
+ * @param <M> type of input stream {@link Message} for window
+ * @param <WK>  type of window key in the output {@link Message}
+ * @param <WS>  type of {@link WindowState} variable in the state store
+ * @param <WM>  type of the message in the output stream
+ */
+public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> {
+
+  /**
+   * get the transformation function of the {@link WindowFn}
+   *
+   * @return  the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput}
+   */
+  BiFunction<M, Entry<WK, WS>, WM> getTransformFunc();
+
+  /**
+   * get the state store functions for this {@link WindowFn}
+   *
+   * @return  the collection of state store methods
+   */
+  Operators.StoreFunctions<M, WK, WS> getStoreFuncs();
+
+  /**
+   * get the trigger conditions for this {@link WindowFn}
+   *
+   * @return  the trigger condition for the {@link WindowFn} function
+   */
+  Trigger<M, WS> getTrigger();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
new file mode 100644
index 0000000..643b703
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.data.Message;
+
+
+/**
+ * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function
+ *
+ * @param <K>  the type of key in the output window result
+ * @param <M>  the type of value in the output window result
+ */
+public final class WindowOutput<K, M> implements Message<K, M> {
+  private final K key;
+  private final M value;
+
+  WindowOutput(K key, M aggregated) {
+    this.key = key;
+    this.value = aggregated;
+  }
+
+  @Override public M getMessage() {
+    return this.value;
+  }
+
+  @Override public K getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return 0;
+  }
+
+  static public <K, M> WindowOutput<K, M> of(K key, M result) {
+    return new WindowOutput<>(key, result);
+  }
+}
+


[2/4] samza git commit: SAMZA-1045: Move classes from operator/api into samza-api

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java b/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
deleted file mode 100644
index e557b34..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.operators.api.internal.Trigger;
-import org.apache.samza.operators.api.internal.Operators;
-import org.apache.samza.operators.api.internal.WindowFn;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be
- * used by the user (i.e. programmers) to create {@link Window} function directly.
- *
- */
-public final class Windows {
-
-  /**
-   * private constructor to prevent instantiation
-   */
-  private Windows() {}
-
-  /**
-   * This class defines a session window function class
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of session key in the session window
-   * @param <WV>  the type of output value in each session window
-   */
-  static class SessionWindow<M extends Message, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> {
-
-    /**
-     * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows}
-     *
-     * @param sessionKeyFunction  function to get the session key from the input {@link Message}
-     * @param aggregator  function to calculate the output value based on the input {@link Message} and current output value
-     */
-    private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) {
-      this.wndKeyFunction = sessionKeyFunction;
-      this.aggregator = aggregator;
-    }
-
-    /**
-     * function to calculate the window key from input message
-     */
-    private final Function<M, WK> wndKeyFunction;
-
-    /**
-     * function to calculate the output value from the input message and the current output value
-     */
-    private final BiFunction<M, WV, WV> aggregator;
-
-    /**
-     * trigger condition that determines when to send out the output value in a {@link WindowOutput} message
-     */
-    private Trigger<M, WindowState<WV>> trigger = null;
-
-    //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link Message} type for {@link Window}
-    private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null;
-
-    /**
-     * Public API methods start here
-     */
-
-    /**
-     * Public API method to define the watermark trigger for the window operator
-     *
-     * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow}
-     * @return The window operator w/ the defined watermark trigger
-     */
-    @Override
-    public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) {
-      this.trigger = wndTrigger.build();
-      return this;
-    }
-
-    private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
-      // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers;
-      return null;
-    }
-
-    private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() {
-      return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
-
-        @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
-          return SessionWindow.this.getTransformFunc();
-        }
-
-        @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> getStoreFuncs() {
-          return SessionWindow.this.storeFunctions;
-        }
-
-        @Override public Trigger<M, WindowState<WV>> getTrigger() {
-          return SessionWindow.this.trigger;
-        }
-      };
-    }
-  }
-
-  static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
-      Window<M, WK, WV, WM> window) {
-    if (window instanceof SessionWindow) {
-      SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window;
-      return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
-    }
-    throw new IllegalArgumentException("Input window type not supported.");
-  }
-
-  /**
-   * Public static API methods start here
-   *
-   */
-
-  /**
-   * The public programming interface class for window function
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of key to the {@link Window}
-   * @param <WV>  the type of output value in the {@link WindowOutput}
-   * @param <WM>  the type of message in the window output stream
-   */
-  public interface Window<M extends Message, WK, WV, WM extends WindowOutput<WK, WV>> {
-
-    /**
-     * Set the triggers for this {@link Window}
-     *
-     * @param wndTrigger  trigger conditions set by the programmers
-     * @return  the {@link Window} function w/ the trigger {@code wndTrigger}
-     */
-    Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of input {@link Message}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> { c.add(m); return c; });
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param sessionInfoExtractor  function to retrieve session info of type {@code SI} from the input message of type {@code M}
-   * @param <M>  type of the input {@link Message}
-   * @param <WK>  type of the session window key
-   * @param <SI>  type of the session information retrieved from each input message of type {@code M}
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction,
-      Function<M, SI> sessionInfoExtractor) {
-    return new SessionWindow<>(sessionKeyFunction,
-        (m, c) -> { c.add(sessionInfoExtractor.apply(m)); return c; } );
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} as a counter of input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of the input {@link Message}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
deleted file mode 100644
index ba74618..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This class implements a {@link Message} that encapsulates an {@link IncomingMessageEnvelope} from the system
- *
- */
-public class IncomingSystemMessage implements Message<Object, Object>, InputSystemMessage<Offset> {
-  /**
-   * Incoming message envelope
-   */
-  private final IncomingMessageEnvelope imsg;
-
-  /**
-   * The receive time of this incoming message
-   */
-  private final long recvTimeNano;
-
-  /**
-   * Ctor to create a {@code IncomingSystemMessage} from {@link IncomingMessageEnvelope}
-   *
-   * @param imsg The incoming system message
-   */
-  public IncomingSystemMessage(IncomingMessageEnvelope imsg) {
-    this.imsg = imsg;
-    this.recvTimeNano = System.nanoTime();
-  }
-
-  @Override
-  public Object getMessage() {
-    return this.imsg.getMessage();
-  }
-
-  @Override
-  public Object getKey() {
-    return this.imsg.getKey();
-  }
-
-  @Override
-  public long getTimestamp() {
-    return this.recvTimeNano;
-  }
-
-  @Override
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
-    // assuming incoming message carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.imsg.getOffset());
-  }
-
-  @Override
-  public SystemStreamPartition getSystemStreamPartition() {
-    return imsg.getSystemStreamPartition();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
deleted file mode 100644
index c786025..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This interface defines additional methods a message from an system input should implement, including the methods to
- * get {@link SystemStreamPartition} and the {@link Offset} of the input system message.
- */
-public interface InputSystemMessage<O extends Offset> {
-
-  /**
-   * Get the input message's {@link SystemStreamPartition}
-   *
-   * @return  the {@link SystemStreamPartition} this message is coming from
-   */
-  SystemStreamPartition getSystemStreamPartition();
-
-  /**
-   * Get the offset of the message in the input stream. This should be used to uniquely identify a message in an input stream.
-   *
-   * @return The offset of the message in the input stream.
-   */
-  O getOffset();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
deleted file mode 100644
index f059b33..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-/**
- * An implementation of {@link org.apache.samza.operators.api.data.Offset}, w/ {@code long} value as the offset
- */
-public class LongOffset implements Offset {
-
-  /**
-   * The offset value in {@code long}
-   */
-  private final Long offset;
-
-  private LongOffset(long offset) {
-    this.offset = offset;
-  }
-
-  public LongOffset(String offset) {
-    this.offset = Long.valueOf(offset);
-  }
-
-  @Override
-  public int compareTo(Offset o) {
-    if (!(o instanceof LongOffset)) {
-      throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName());
-    }
-    LongOffset other = (LongOffset) o;
-    return this.offset.compareTo(other.offset);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof LongOffset)) {
-      return false;
-    }
-    LongOffset o = (LongOffset) other;
-    return this.offset.equals(o.offset);
-  }
-
-  /**
-   * Helper method to get the minimum offset
-   *
-   * @return The minimum offset
-   */
-  public static LongOffset getMinOffset() {
-    return new LongOffset(Long.MIN_VALUE);
-  }
-
-  /**
-   * Helper method to get the maximum offset
-   *
-   * @return The maximum offset
-   */
-  public static LongOffset getMaxOffset() {
-    return new LongOffset(Long.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
deleted file mode 100644
index 9b53b45..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-/**
- * This class defines the generic interface of {@link Message}, which is a entry in the input/output stream.
- *
- * <p>The {@link Message} models the basic operatible unit in streaming SQL processes in Samza.
- *
- */
-public interface Message<K, M> {
-
-  /**
-   * Access method to get the corresponding message body in {@link Message}
-   *
-   * @return Message object in this {@link Message}
-   */
-  M getMessage();
-
-  /**
-   * Method to indicate whether this {@link Message} indicates deletion of a message w/ the message key
-   *
-   * @return A boolean value indicates whether the current message is a delete or insert message
-   */
-  default boolean isDelete() { return false; };
-
-  /**
-   * Access method to the key of the message
-   *
-   * @return The key of the message
-   */
-  K getKey();
-
-  /**
-   * Get the message creation timestamp of the message.
-   *
-   * @return The message's timestamp in nano seconds.
-   */
-  long getTimestamp();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
deleted file mode 100644
index 0fac2c0..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-/**
- * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream
- */
-public interface Offset extends Comparable<Offset> {
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
deleted file mode 100644
index e9bfe0b..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines all basic stream operator classes used by internal implementation only. All classes defined in
- * this file are immutable.
- *
- * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects
- * should be initiated via {@link MessageStream} API methods
- */
-public class Operators {
-  /**
-   * Private constructor to prevent instantiation of the {@link Operators} class
-   */
-  private Operators() {}
-
-  private static String getOperatorId() {
-    // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
-    return UUID.randomUUID().toString();
-  }
-
-  /**
-   * Private interface for stream operator functions. The interface class defines the output of the stream operator function.
-   *
-   */
-  public interface Operator<OM extends Message> {
-    MessageStream<OM> getOutputStream();
-  }
-
-  /**
-   * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s.
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <OM>  the type of output {@link Message}
-   */
-  public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> {
-    /**
-     * The output {@link MessageStream}
-     */
-    private final MessageStream<OM> outputStream;
-
-    /**
-     * The transformation function
-     */
-    private final Function<M, Collection<OM>> txfmFunction;
-
-    /**
-     * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}.
-     *
-     * @param transformFn  the transformation function to be applied that transforms 1 input {@link Message} into a collection
-     *                     of output {@link Message}s
-     */
-    private StreamOperator(Function<M, Collection<OM>> transformFn) {
-      this(transformFn, new MessageStream<>());
-    }
-
-    /**
-     * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream}
-     *
-     * @param transformFn  the transformation function
-     * @param outputStream  the output {@link MessageStream}
-     */
-    private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) {
-      this.outputStream = outputStream;
-      this.txfmFunction = transformFn;
-    }
-
-    @Override
-    public MessageStream<OM> getOutputStream() {
-      return this.outputStream;
-    }
-
-    /**
-     * Method to get the transformation function.
-     *
-     * @return the {@code txfmFunction}
-     */
-    public Function<M, Collection<OM>> getFunction() {
-      return this.txfmFunction;
-    }
-
-  }
-
-  /**
-   * A sink operator function that allows customized code to send the output to external system. This is the terminal
-   * operator that does not have any output {@link MessageStream} that allows further processing in the same {@link org.apache.samza.task.StreamOperatorTask}
-   *
-   * @param <M>  the type of input {@link Message}
-   */
-  public static class SinkOperator<M extends Message> implements Operator {
-
-    /**
-     * The user-defined sink function
-     */
-    private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink;
-
-    /**
-     * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}.
-     *
-     * @param sink  the user-defined sink function
-     */
-    private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public MessageStream getOutputStream() {
-      return null;
-    }
-
-    /**
-     * Method to get the user-defined function implements the {@link SinkOperator}
-     *
-     * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector}
-     *         and {@link TaskCoordinator} to the sink function
-     */
-    public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() {
-      return this.sink;
-    }
-  }
-
-  /**
-   * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve
-   * buffered messages and partial aggregation results
-   *
-   * @param <SK>  the type of key used to store the operator states
-   * @param <SS>  the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered
-   *             input message from the join stream for a join
-   */
-  public static class StoreFunctions<M extends Message, SK, SS> {
-    /**
-     * Function to define the key to query in the operator state store, according to the incoming {@link Message}
-     * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping
-     * windows and unique-key-based join.
-     *
-     * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query
-     * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input
-     * message to a range of keys in the store.
-     */
-    private final Function<M, SK> storeKeyFinder;
-
-    /**
-     * Function to update the store entry based on the current state and the incoming {@link Message}
-     *
-     * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping
-     * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the
-     * state value.
-     */
-    private final BiFunction<M, SS, SS> stateUpdater;
-
-    /**
-     * Constructor of state store functions.
-     *
-     */
-    private StoreFunctions(Function<M, SK> keyFinder,
-        BiFunction<M, SS, SS> stateUpdater) {
-      this.storeKeyFinder = keyFinder;
-      this.stateUpdater = stateUpdater;
-    }
-
-    /**
-     * Method to get the {@code storeKeyFinder} function
-     *
-     * @return  the function to calculate the key from an input {@link Message}
-     */
-    public Function<M, SK> getStoreKeyFinder() {
-      return this.storeKeyFinder;
-    }
-
-    /**
-     * Method to get the {@code stateUpdater} function
-     *
-     * @return  the function to update the corresponding state according to an input {@link Message}
-     */
-    public BiFunction<M, SS, SS> getStateUpdater() {
-      return this.stateUpdater;
-    }
-  }
-
-  /**
-   * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate
-   * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput}
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of key in the output {@link Message} from the {@link WindowOperator} function
-   * @param <WS>  the type of window state in the {@link WindowOperator} function
-   * @param <WM>  the type of window output {@link Message}
-   */
-  public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> {
-    /**
-     * The output {@link MessageStream}
-     */
-    private final MessageStream<WM> outputStream;
-
-    /**
-     * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window
-     * state(s) from the window state store, and generate output {@link Message}s to the output stream.
-     */
-    private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction;
-
-    /**
-     * The state store functions for the {@link WindowOperator}
-     */
-    private final StoreFunctions<M, WK, WS> storeFunctions;
-
-    /**
-     * The window trigger function
-     */
-    private final Trigger<M, WS> trigger;
-
-    /**
-     * The unique ID of stateful operators
-     */
-    private final String opId;
-
-    /**
-     * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}.
-     *
-     * @param windowFn  description of the window function
-     * @param operatorId  auto-generated unique ID of the operator
-     */
-    private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) {
-      this.outputStream = new MessageStream<>();
-      this.txfmFunction = windowFn.getTransformFunc();
-      this.storeFunctions = windowFn.getStoreFuncs();
-      this.trigger = windowFn.getTrigger();
-      this.opId = operatorId;
-    }
-
-    @Override
-    public String toString() {
-      return this.opId;
-    }
-
-    @Override
-    public MessageStream<WM> getOutputStream() {
-      return this.outputStream;
-    }
-
-    /**
-     * Method to get the window's {@link StoreFunctions}.
-     *
-     * @return  the window operator's {@code storeFunctions}
-     */
-    public StoreFunctions<M, WK, WS> getStoreFunctions() {
-      return this.storeFunctions;
-    }
-
-    /**
-     * Method to get the window operator's main function
-     *
-     * @return   the window operator's {@code txfmFunction}
-     */
-    public BiFunction<M, Entry<WK, WS>, WM> getFunction() {
-      return this.txfmFunction;
-    }
-
-    /**
-     * Method to get the trigger functions
-     *
-     * @return  the {@link Trigger} for this {@link WindowOperator}
-     */
-    public Trigger<M, WS> getTrigger() {
-      return this.trigger;
-    }
-
-    /**
-     * Method to generate the window operator's state store name
-     *
-     * @param inputStream the input {@link MessageStream} to this state store
-     * @return   the persistent store name of the window operator
-     */
-    public String getStoreName(MessageStream<M> inputStream) {
-      //TODO: need to get the persistent name of ds and the operator in a serialized form
-      return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString());
-    }
-  }
-
-  /**
-   * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from
-   * another stream and generate join output to {@code output}
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <K>  the type of join key
-   * @param <JM>  the type of message of {@link Message} in the other join stream
-   * @param <RM>  the type of message of {@link Message} in the join output stream
-   */
-  public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> {
-
-    private final MessageStream<RM> joinOutput;
-
-    /**
-     * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message,
-     * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}.
-     */
-    private final BiFunction<M, JM, RM> txfmFunction;
-
-    /**
-     * The message store functions that read the buffered messages from the other stream in the join
-     */
-    private final StoreFunctions<JM, K, JM> joinStoreFunctions;
-
-    /**
-     * The message store functions that save the buffered messages of this {@link MessageStream} in the join
-     */
-    private final StoreFunctions<M, K, M> selfStoreFunctions;
-
-    /**
-     * The unique ID for the stateful operator
-     */
-    private final String opId;
-
-    /**
-     * Default constructor to create a {@link PartialJoinOperator} object
-     *
-     * @param partialJoin  partial join function that take type {@code M} of input {@link Message} and join w/ type
-     *                     {@code JM} of buffered {@link Message} from another stream
-     * @param joinOutput  the output {@link MessageStream} of the join results
-     */
-    private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) {
-      this.joinOutput = joinOutput;
-      this.txfmFunction = partialJoin;
-      // Read-only join store, no creator/updater functions specified
-      this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null);
-      // Buffered message store for this input stream
-      this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m);
-      this.opId = opId;
-    }
-
-    @Override
-    public String toString() {
-      return this.opId;
-    }
-
-    @Override
-    public MessageStream<RM> getOutputStream() {
-      return this.joinOutput;
-    }
-
-    /**
-     * Method to get {@code joinStoreFunctions}
-     *
-     * @return  {@code joinStoreFunctions}
-     */
-    public StoreFunctions<JM, K, JM> getJoinStoreFunctions() {
-      return this.joinStoreFunctions;
-    }
-
-    /**
-     * Method to get {@code selfStoreFunctions}
-     *
-     * @return  {@code selfStoreFunctions}
-     */
-    public StoreFunctions<M, K, M> getSelfStoreFunctions() {
-      return this.selfStoreFunctions;
-    }
-
-    /**
-     * Method to get {@code txfmFunction}
-     *
-     * @return  {@code txfmFunction}
-     */
-    public BiFunction<M, JM, RM> getFunction() {
-      return this.txfmFunction;
-    }
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator}
-   *
-   * @param transformFn  the corresponding transformation function
-   * @param <M>  type of input {@link Message}
-   * @param <OM>  type of output {@link Message}
-   * @return  the {@link StreamOperator}
-   */
-  public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) {
-    return new StreamOperator<>(transformFn);
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator}
-   *
-   * @param sinkFn  the sink function
-   * @param <M>  type of input {@link Message}
-   * @return   the {@link SinkOperator}
-   */
-  public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) {
-    return new SinkOperator<>(sinkFn);
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
-   *
-   * @param windowFn  the {@link WindowFn} function
-   * @param <M>  type of input {@link Message}
-   * @param <WK>  type of window key
-   * @param <WS>  type of {@link WindowState}
-   * @param <WM>  type of output {@link WindowOutput}
-   * @return  the {@link WindowOperator}
-   */
-  public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator(
-    WindowFn<M, WK, WS, WM> windowFn) {
-    return new WindowOperator<>(windowFn, Operators.getOperatorId());
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
-   *
-   * @param joiner  the {@link WindowFn} function
-   * @param joinOutput  the output {@link MessageStream}
-   * @param <M>  type of input {@link Message}
-   * @param <K>  type of join key
-   * @param <JM>  the type of message in the {@link Message} from the other join stream
-   * @param <RM>  the type of message in the {@link Message} from the join function
-   * @return  the {@link PartialJoinOperator}
-   */
-  public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator(
-    BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) {
-    return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId());
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function
-   *
-   * @param mergeOutput  the common output {@link MessageStream} from the merger
-   * @param <M>  the type of input {@link Message}
-   * @return  the {@link StreamOperator} for merge
-   */
-  public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) {
-    return new StreamOperator<M, M>(t ->
-      new ArrayList<M>() {{
-        this.add(t);
-      }},
-      mergeOutput);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
deleted file mode 100644
index 33a0134..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable.
- *
- * @param <M>  the type of message from the input stream
- * @param <S>  the type of state variable in the window's state store
- */
-public class Trigger<M extends Message, S extends WindowState> {
-
-  /**
-   * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward
-   */
-  private final Function<S, Boolean> timerTrigger;
-
-  /**
-   * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator}
-   */
-  private final BiFunction<M, S, Boolean> earlyTrigger;
-
-  /**
-   * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator}
-   */
-  private final BiFunction<M, S, Boolean> lateTrigger;
-
-  /**
-   * the function to updated the window state when the first output is triggered
-   */
-  private final Function<S, S> earlyTriggerUpdater;
-
-  /**
-   * the function to updated the window state when the late output is triggered
-   */
-  private final Function<S, S> lateTriggerUpdater;
-
-  /**
-   * Private constructor to prevent instantiation
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger   late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater   late trigger state updater
-   */
-  private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
-      Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
-    this.timerTrigger = timerTrigger;
-    this.earlyTrigger = earlyTrigger;
-    this.lateTrigger = lateTrigger;
-    this.earlyTriggerUpdater = earlyTriggerUpdater;
-    this.lateTriggerUpdater = lateTriggerUpdater;
-  }
-
-  /**
-   * Static method to create a {@link Trigger} object
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger  late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater  late trigger state updater
-   * @param <M>  the type of input {@link Message}
-   * @param <S>  the type of window state extends {@link WindowState}
-   * @return  the {@link Trigger} function
-   */
-  public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger,
-      BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater,
-      Function<S, S> lateTriggerUpdater) {
-    return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
deleted file mode 100644
index 1fd88e7..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.function.BiFunction;
-
-
-/**
- * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used
- * by the internal representation and implementation classes in operators.
- *
- * @param <M> type of input stream {@link Message} for window
- * @param <WK>  type of window key in the output {@link Message}
- * @param <WS>  type of {@link WindowState} variable in the state store
- * @param <WM>  type of the message in the output stream
- */
-public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> {
-
-  /**
-   * get the transformation function of the {@link WindowFn}
-   *
-   * @return  the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput}
-   */
-  BiFunction<M, Entry<WK, WS>, WM> getTransformFunc();
-
-  /**
-   * get the state store functions for this {@link WindowFn}
-   *
-   * @return  the collection of state store methods
-   */
-  Operators.StoreFunctions<M, WK, WS> getStoreFuncs();
-
-  /**
-   * get the trigger conditions for this {@link WindowFn}
-   *
-   * @return  the trigger condition for the {@link WindowFn} function
-   */
-  Trigger<M, WS> getTrigger();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
deleted file mode 100644
index e202c20..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-/**
- * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function
- *
- * @param <K>  the type of key in the output window result
- * @param <M>  the type of value in the output window result
- */
-public final class WindowOutput<K, M> implements Message<K, M> {
-  private final K key;
-  private final M value;
-
-  WindowOutput(K key, M aggregated) {
-    this.key = key;
-    this.value = aggregated;
-  }
-
-  @Override public M getMessage() {
-    return this.value;
-  }
-
-  @Override public K getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return 0;
-  }
-
-  static public <K, M> WindowOutput<K, M> of(K key, M result) {
-    return new WindowOutput<>(key, result);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
index 59de16b..82f3c28 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
@@ -19,9 +19,9 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.Operator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.Operator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
index f16cbc6..d3d8f8b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
@@ -19,10 +19,10 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.commons.collections.keyvalue.AbstractMapEntry;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.*;
-import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.*;
+import org.apache.samza.operators.internal.WindowOutput;
 import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
 import org.apache.samza.operators.impl.window.SessionWindowImpl;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 3ca8bde..f55c758 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.MessageStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -90,7 +90,7 @@ public abstract class OperatorImpl<M extends Message, RM extends Message>
   protected void init(MessageStream<M> source, TaskContext context) {};
 
   /**
-   * Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream}
+   * Method to trigger all downstream operators that consumes the output {@link MessageStream}
    * from this operator
    *
    * @param omsg  output {@link Message}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
index 5a375bc..cc7ef2b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.data.Message;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
index b29d9c8..b0f4f27 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 5d25cfa..a8a639e 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.internal.Operators.SinkOperator;
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.Message;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
index f573fd0..7840b5b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
index bbe08a4..f4a6a58 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
@@ -18,19 +18,20 @@
  */
 package org.apache.samza.operators.impl.join;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators;
+import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
 import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * Implementation of a {@link org.apache.samza.operators.api.internal.Operators.PartialJoinOperator}. This class implements function
+ * Implementation of a {@link Operators.PartialJoinOperator}. This class implements function
  * that only takes in one input stream among all inputs to the join and generate the join output.
  *
- * @param <M>  Type of input stream {@link org.apache.samza.operators.api.data.Message}
- * @param <RM>  Type of join output stream {@link org.apache.samza.operators.api.data.Message}
+ * @param <M>  Type of input stream {@link Message}
+ * @param <RM>  Type of join output stream {@link Message}
  */
 public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
index 59e2dec..0d6141e 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
@@ -18,11 +18,11 @@
  */
 package org.apache.samza.operators.impl.window;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.WindowOperator;
-import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.operators.internal.WindowOutput;
 import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.operators.impl.StateStoreImpl;
 import org.apache.samza.storage.kv.Entry;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
index e340fe8..18b077b 100644
--- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
@@ -19,11 +19,12 @@
 package org.apache.samza.task;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.MessageStreams;
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.data.IncomingSystemMessage;
 import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -45,7 +46,7 @@ public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask
   /**
    * Wrapped {@link StreamOperatorTask} class
    */
-  private final StreamOperatorTask  userTask;
+  private final StreamOperatorTask userTask;
 
   /**
    * Constructor that wraps the user-defined {@link StreamOperatorTask}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
deleted file mode 100644
index cfdb694..0000000
--- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import java.util.Collection;
-
-/**
- * This interface defines the methods that user needs to implement via the operator programming APIs.
- */
-public interface StreamOperatorTask {
-
-  /**
-   * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s.
-   * Users have to implement this function to instantiate {@link org.apache.samza.operators.impl.ChainedOperators} that
-   * will process each incoming {@link SystemMessageStream}.
-   *
-   * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition}
-   *
-   * @param sources  the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.api.data.IncomingSystemMessage}
-   *                 from a {@link org.apache.samza.system.SystemStreamPartition}
-   */
-  void initOperators(Collection<SystemMessageStream> sources);
-
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
deleted file mode 100644
index 0f00fdb..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-public class TestMessage implements Message<String, String> {
-
-  private final String key;
-  private final String value;
-  private final long timestamp;
-
-  TestMessage(String key, String value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public String getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
deleted file mode 100644
index 9f9ad6b..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.internal.Operators.*;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestMessageStream {
-
-  @Test public void testMap() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2);
-    MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestOutputMessage> mapOp = subs.iterator().next();
-    assertTrue(mapOp instanceof StreamOperator);
-    assertEquals(mapOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    TestMessage xTestMsg = mock(TestMessage.class);
-    when(xTestMsg.getKey()).thenReturn("test-msg-key");
-    when(xTestMsg.getMessage()).thenReturn("123456789");
-    when(xTestMsg.getTimestamp()).thenReturn(12345L);
-    Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
-    assertEquals(cOutputMsg.size(), 1);
-    TestOutputMessage outputMessage = cOutputMsg.iterator().next();
-    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
-    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1));
-    assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
-  }
-
-  @Test public void testFlatMap() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() {{
-      this.add(mock(TestOutputMessage.class));
-      this.add(mock(TestOutputMessage.class));
-      this.add(mock(TestOutputMessage.class));
-    }};
-    Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts;
-    MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperator);
-    assertEquals(flatMapOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap);
-  }
-
-  @Test public void testFilter() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
-    MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> filterOp = subs.iterator().next();
-    assertTrue(filterOp instanceof StreamOperator);
-    assertEquals(filterOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
-    TestMessage mockMsg = mock(TestMessage.class);
-    when(mockMsg.getTimestamp()).thenReturn(11111L);
-    Collection<TestMessage> output = txfmFn.apply(mockMsg);
-    assertTrue(output.isEmpty());
-    when(mockMsg.getTimestamp()).thenReturn(999999L);
-    output = txfmFn.apply(mockMsg);
-    assertEquals(output.size(), 1);
-    assertEquals(output.iterator().next(), mockMsg);
-  }
-
-  @Test public void testSink() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> {
-      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
-      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    };
-    inputStream.sink(xSink);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> sinkOp = subs.iterator().next();
-    assertTrue(sinkOp instanceof SinkOperator);
-    assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
-    assertNull(((SinkOperator) sinkOp).getOutputStream());
-  }
-
-  @Test public void testWindow() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class);
-    MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> wndOp = subs.iterator().next();
-    assertTrue(wndOp instanceof WindowOperator);
-    assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
-  }
-
-  @Test public void testJoin() {
-    MessageStream<TestMessage> source1 = new MessageStream<>();
-    MessageStream<TestMessage> source2 = new MessageStream<>();
-    BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp());
-    MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner);
-    Collection<Operator> subs = source1.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> joinOp1 = subs.iterator().next();
-    assertTrue(joinOp1 instanceof PartialJoinOperator);
-    assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput);
-    subs = source2.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> joinOp2 = subs.iterator().next();
-    assertTrue(joinOp2 instanceof PartialJoinOperator);
-    assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput);
-    TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L);
-    TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L);
-    TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    assertEquals(xOut.getTimestamp(), 11111L);
-    xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    assertEquals(xOut.getTimestamp(), 11111L);
-  }
-
-  @Test public void testMerge() {
-    MessageStream<TestMessage> merge1 = new MessageStream<>();
-    Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>(){{
-      this.add(new MessageStream<>());
-      this.add(new MessageStream<>());
-    }};
-    MessageStream<TestMessage> mergeOutput = merge1.merge(others);
-    validateMergeOperator(merge1, mergeOutput);
-
-    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
-  }
-
-  private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) {
-    Collection<Operator> subs = mergeSource.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> mergeOp = subs.iterator().next();
-    assertTrue(mergeOp instanceof StreamOperator);
-    assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.iterator().next(), mockMsg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
deleted file mode 100644
index e6aa692..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestMessageStreams {
-
-  @Test public void testInput() {
-    SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0));
-    MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
-    assertEquals(mSysStream.getSystemStreamPartition(), ssp);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
deleted file mode 100644
index 225e77f..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-public class TestOutputMessage implements Message<String, Integer> {
-  private final String key;
-  private final Integer value;
-  private final long timestamp;
-
-  public TestOutputMessage(String key, Integer value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}
-


[3/4] samza git commit: SAMZA-1045: Move classes from operator/api into samza-api

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
new file mode 100644
index 0000000..da813b1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.task;
+
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+
+import java.util.Collection;
+
+/**
+ * This interface defines the methods that user needs to implement via the operator programming APIs.
+ */
+public interface StreamOperatorTask {
+
+  /**
+   * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s.
+   * Users have to implement this function to define their transformation logic on each of the incoming
+   * {@link SystemMessageStream}.
+   *
+   * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition}
+   *
+   * @param sources  the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.data.IncomingSystemMessage}
+   *                 from a {@link org.apache.samza.system.SystemStreamPartition}
+   */
+  void initOperators(Collection<SystemMessageStream> sources);
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
index 963ccf2..adb6264 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
@@ -47,12 +47,12 @@ public interface StorageEngineFactory<K, V> {
    * @return The storage engine instance.
    */
   public StorageEngine getStorageEngine(
-    String storeName,
-    File storeDir,
-    Serde<K> keySerde,
-    Serde<V> msgSerde,
-    MessageCollector collector,
-    MetricsRegistry registry,
-    SystemStreamPartition changeLogSystemStreamPartition,
-    SamzaContainerContext containerContext);
+      String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector collector,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      SamzaContainerContext containerContext);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
new file mode 100644
index 0000000..8c56287
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+
+
+public class TestMessage implements Message<String, String> {
+
+  private final String key;
+  private final String value;
+  private final long timestamp;
+
+  TestMessage(String key, String value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public String getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
new file mode 100644
index 0000000..4dbe233
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.internal.Operators.*;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStream {
+
+  @Test public void testMap() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2);
+    MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestOutputMessage> mapOp = subs.iterator().next();
+    assertTrue(mapOp instanceof StreamOperator);
+    assertEquals(mapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    TestMessage xTestMsg = mock(TestMessage.class);
+    when(xTestMsg.getKey()).thenReturn("test-msg-key");
+    when(xTestMsg.getMessage()).thenReturn("123456789");
+    when(xTestMsg.getTimestamp()).thenReturn(12345L);
+    Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
+    assertEquals(cOutputMsg.size(), 1);
+    TestOutputMessage outputMessage = cOutputMsg.iterator().next();
+    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1));
+    assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
+  }
+
+  @Test public void testFlatMap() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { {
+        this.add(mock(TestOutputMessage.class));
+        this.add(mock(TestOutputMessage.class));
+        this.add(mock(TestOutputMessage.class));
+      } };
+    Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts;
+    MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
+    assertTrue(flatMapOp instanceof StreamOperator);
+    assertEquals(flatMapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap);
+  }
+
+  @Test public void testFilter() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
+    MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> filterOp = subs.iterator().next();
+    assertTrue(filterOp instanceof StreamOperator);
+    assertEquals(filterOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
+    TestMessage mockMsg = mock(TestMessage.class);
+    when(mockMsg.getTimestamp()).thenReturn(11111L);
+    Collection<TestMessage> output = txfmFn.apply(mockMsg);
+    assertTrue(output.isEmpty());
+    when(mockMsg.getTimestamp()).thenReturn(999999L);
+    output = txfmFn.apply(mockMsg);
+    assertEquals(output.size(), 1);
+    assertEquals(output.iterator().next(), mockMsg);
+  }
+
+  @Test public void testSink() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> {
+      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    };
+    inputStream.sink(xSink);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> sinkOp = subs.iterator().next();
+    assertTrue(sinkOp instanceof SinkOperator);
+    assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
+    assertNull(((SinkOperator) sinkOp).getOutputStream());
+  }
+
+  @Test public void testWindow() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class);
+    MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> wndOp = subs.iterator().next();
+    assertTrue(wndOp instanceof WindowOperator);
+    assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
+  }
+
+  @Test public void testJoin() {
+    MessageStream<TestMessage> source1 = new MessageStream<>();
+    MessageStream<TestMessage> source2 = new MessageStream<>();
+    BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp());
+    MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner);
+    Collection<Operator> subs = source1.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> joinOp1 = subs.iterator().next();
+    assertTrue(joinOp1 instanceof PartialJoinOperator);
+    assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput);
+    subs = source2.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> joinOp2 = subs.iterator().next();
+    assertTrue(joinOp2 instanceof PartialJoinOperator);
+    assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput);
+    TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L);
+    TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L);
+    TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    assertEquals(xOut.getTimestamp(), 11111L);
+    xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    assertEquals(xOut.getTimestamp(), 11111L);
+  }
+
+  @Test public void testMerge() {
+    MessageStream<TestMessage> merge1 = new MessageStream<>();
+    Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>() { {
+        this.add(new MessageStream<>());
+        this.add(new MessageStream<>());
+      } };
+    MessageStream<TestMessage> mergeOutput = merge1.merge(others);
+    validateMergeOperator(merge1, mergeOutput);
+
+    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  }
+
+  private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) {
+    Collection<Operator> subs = mergeSource.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> mergeOp = subs.iterator().next();
+    assertTrue(mergeOp instanceof StreamOperator);
+    assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
+    TestMessage mockMsg = mock(TestMessage.class);
+    Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg);
+    assertEquals(outputs.size(), 1);
+    assertEquals(outputs.iterator().next(), mockMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
new file mode 100644
index 0000000..c5fcceb
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMessageStreams {
+
+  @Test public void testInput() {
+    SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0));
+    MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
+    assertEquals(mSysStream.getSystemStreamPartition(), ssp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
new file mode 100644
index 0000000..14e6562
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+
+
+public class TestOutputMessage implements Message<String, Integer> {
+  private final String key;
+  private final Integer value;
+  private final long timestamp;
+
+  public TestOutputMessage(String key, Integer value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public Integer getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
new file mode 100644
index 0000000..e6d9e4a
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder {
+  private Field earlyTriggerField;
+  private Field lateTriggerField;
+  private Field timerTriggerField;
+  private Field earlyTriggerUpdater;
+  private Field lateTriggerUpdater;
+
+  @Before
+  public void testPrep() throws Exception {
+    this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
+    this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
+    this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
+    this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+    this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+    this.earlyTriggerField.setAccessible(true);
+    this.lateTriggerField.setAccessible(true);
+    this.timerTriggerField.setAccessible(true);
+    this.earlyTriggerUpdater.setAccessible(true);
+    this.lateTriggerUpdater.setAccessible(true);
+  }
+
+  @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    when(mockState.getNumberMessages()).thenReturn(2000L);
+    assertTrue(triggerField.apply(null, mockState));
+
+    Function<TestMessage, Boolean> tokenFunc = m -> true;
+    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    TestMessage m = mock(TestMessage.class);
+    assertTrue(triggerField.apply(m, mockState));
+
+    builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+    when(m.getTimestamp()).thenReturn(19999000000L);
+    assertFalse(triggerField.apply(m, mockState));
+    when(m.getTimestamp()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+    when(m.getTimestamp()).thenReturn(1001000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class);
+    builder = TriggerBuilder.earlyTrigger(mockFunc);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    assertEquals(triggerField, mockFunc);
+
+    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test public void testAddTimerTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addTimeoutSinceFirstMessage(10000L);
+    // exam that both earlyTrigger and timer triggers are set up
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    // check the timer trigger
+    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    // exam that both early trigger and timer triggers are set up
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    builder.addTimeoutSinceLastMessage(20000L);
+    // check the timer trigger
+    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test public void testAddLateTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTriggerOnSizeLimit(10000L);
+    // exam that both earlyTrigger and lateTriggers are set up
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // check the late trigger
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    // set the number of messages to 10001 to trigger the late trigger
+    when(mockState.getNumberMessages()).thenReturn(10001L);
+    assertTrue(lateTrigger.apply(null, mockState));
+
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+    // exam that both earlyTrigger and lateTriggers are set up
+    earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // exam the lateTrigger
+    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+    lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    List<TestMessage> mockList = mock(ArrayList.class);
+    when(mockList.size()).thenReturn(200);
+    when(mockState.getOutputValue()).thenReturn(mockList);
+    assertTrue(lateTrigger.apply(null, mockState));
+  }
+
+  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.onEarlyTrigger(c -> {
+      c.clear();
+      return c;
+    });
+    List<TestMessage> collection = new ArrayList<TestMessage>() { {
+        for (int i = 0; i < 10; i++) {
+          this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime()));
+        }
+      } };
+    // exam that earlyTriggerUpdater is set up
+    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
+        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    earlyTriggerUpdater.apply(mockState);
+    assertTrue(collection.isEmpty());
+
+    collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime()));
+    collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime()));
+    builder.onLateTrigger(c -> {
+      c.removeIf(t -> t.getKey().equals("key-to-remove"));
+      return c;
+    });
+    // check the late trigger updater
+    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater =
+        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    lateTriggerUpdater.apply(mockState);
+    assertTrue(collection.size() == 1);
+    assertFalse(collection.get(0).isDelete());
+    assertEquals(collection.get(0).getKey(), "key-to-stay");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
new file mode 100644
index 0000000..8a25a96
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.Windows.Window;
+import org.apache.samza.operators.internal.Trigger;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+  @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
+    // test constructing the default session window
+    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions(
+        TestMessage::getKey);
+    assertTrue(testWnd instanceof Windows.SessionWindow);
+    Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
+    Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator");
+    wndKeyFuncField.setAccessible(true);
+    aggregatorField.setAccessible(true);
+    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd);
+    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key");
+    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc =
+        (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd);
+    TestMessage mockMsg = mock(TestMessage.class);
+    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains(mockMsg));
+
+    // test constructing the session window w/ customized session info
+    Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
+        m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0));
+    assertTrue(testWnd2 instanceof Windows.SessionWindow);
+    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
+    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2);
+    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0");
+    when(mockMsg.getMessage()).thenReturn("x-001");
+    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains('x'));
+
+    // test constructing session window w/ a default counter
+    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getTimestamp()));
+    assertTrue(testCounter instanceof Windows.SessionWindow);
+    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter);
+    BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
+    when(mockMsg.getTimestamp()).thenReturn(12345L);
+    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+  }
+
+  @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
+    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getTimestamp()));
+    // test session window w/ a trigger
+    TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+    testCounter.setTriggers(triggerBuilder);
+    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
+    Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
+    // examine all trigger fields are expected
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdater.setAccessible(true);
+    lateTriggerUpdater.setAccessible(true);
+    assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
+    assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
+    assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
+    assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
+    assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
new file mode 100644
index 0000000..b734e87
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestIncomingSystemMessage {
+
+  @Test public void testConstructor() {
+    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
+    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+
+    Object mockKey = mock(Object.class);
+    Object mockValue = mock(Object.class);
+    LongOffset testOffset = new LongOffset("12345");
+    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
+
+    when(ime.getKey()).thenReturn(mockKey);
+    when(ime.getMessage()).thenReturn(mockValue);
+    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
+    when(ime.getOffset()).thenReturn("12345");
+
+    assertEquals(ism.getKey(), mockKey);
+    assertEquals(ism.getMessage(), mockValue);
+    assertEquals(ism.getSystemStreamPartition(), mockSsp);
+    assertEquals(ism.getOffset(), testOffset);
+    assertFalse(ism.isDelete());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
new file mode 100644
index 0000000..943c47f
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestLongOffset {
+
+  @Test public void testConstructor() throws Exception {
+    LongOffset o1 = new LongOffset("12345");
+    Field offsetField = LongOffset.class.getDeclaredField("offset");
+    offsetField.setAccessible(true);
+    Long x = (Long) offsetField.get(o1);
+    assertEquals(x.longValue(), 12345L);
+
+    o1 = new LongOffset("012345");
+    x = (Long) offsetField.get(o1);
+    assertEquals(x.longValue(), 12345L);
+
+    try {
+      o1 = new LongOffset("xyz");
+      fail("Constructor of LongOffset should have failed w/ mal-formatted numbers");
+    } catch (NumberFormatException nfe) {
+      // expected
+    }
+  }
+
+  @Test public void testComparator() {
+    LongOffset o1 = new LongOffset("11111");
+    Offset other = mock(Offset.class);
+    try {
+      o1.compareTo(other);
+      fail("compareTo() should have have failed when comparing to an object of a different class");
+    } catch (IllegalArgumentException iae) {
+      // expected
+    }
+
+    LongOffset o2 = new LongOffset("-10000");
+    assertEquals(o1.compareTo(o2), 1);
+    LongOffset o3 = new LongOffset("22222");
+    assertEquals(o1.compareTo(o3), -1);
+    LongOffset o4 = new LongOffset("11111");
+    assertEquals(o1.compareTo(o4), 0);
+  }
+
+  @Test public void testEquals() {
+    LongOffset o1 = new LongOffset("12345");
+    Offset other = mock(Offset.class);
+    assertFalse(o1.equals(other));
+
+    LongOffset o2 = new LongOffset("0012345");
+    assertTrue(o1.equals(o2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
new file mode 100644
index 0000000..d994486
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperators {
+
+  private class TestMessage implements Message<String, Object> {
+    private final long timestamp;
+    private final String key;
+    private final Object msg;
+
+
+    TestMessage(String key, Object msg, long timestamp) {
+      this.timestamp = timestamp;
+      this.key = key;
+      this.msg = msg;
+    }
+
+    @Override public Object getMessage() {
+      return this.msg;
+    }
+
+    @Override public String getKey() {
+      return this.key;
+    }
+
+    @Override public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  @Test public void testGetStreamOperator() {
+    Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { {
+        this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
+      } };
+    Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn);
+    assertEquals(strmOp.getFunction(), transformFn);
+    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
+  }
+
+  @Test public void testGetSinkOperator() {
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { };
+    Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn);
+    assertEquals(sinkOp.getFunction(), sinkFn);
+    assertTrue(sinkOp.getOutputStream() == null);
+  }
+
+  @Test public void testGetWindowOperator() {
+    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
+    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class);
+    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
+    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
+    when(windowFn.getTransformFunc()).thenReturn(xFunction);
+    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
+    when(windowFn.getTrigger()).thenReturn(trigger);
+    when(mockInput.toString()).thenReturn("mockStream1");
+
+    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
+    assertEquals(windowOp.getFunction(), xFunction);
+    assertEquals(windowOp.getStoreFunctions(), storeFns);
+    assertEquals(windowOp.getTrigger(), trigger);
+    assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
+  }
+
+  @Test public void testGetPartialJoinOperator() {
+    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
+        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
+            Math.max(m1.getTimestamp(), m2.getTimestamp()));
+    MessageStream<TestMessage> joinOutput = new MessageStream<>();
+    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin =
+        Operators.getPartialJoinOperator(merger, joinOutput);
+
+    assertEquals(partialJoin.getOutputStream(), joinOutput);
+    Message<Object, Object> m = mock(Message.class);
+    Message<Object, Object> s = mock(Message.class);
+    assertEquals(partialJoin.getFunction(), merger);
+    assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
+    assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m);
+    assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
+    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
+  }
+
+  @Test public void testGetMergeOperator() {
+    MessageStream<TestMessage> output = new MessageStream<>();
+    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
+    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { {
+        this.add(t);
+      } };
+    TestMessage t = mock(TestMessage.class);
+    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
+    assertEquals(mergeOp.getOutputStream(), output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
new file mode 100644
index 0000000..0f35a7c
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+  @Test public void testConstructor() throws Exception {
+    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+    Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
+    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> {
+      s.setOutputValue(0);
+      return s;
+    };
+    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> {
+      s.setOutputValue(1);
+      return s;
+    };
+
+    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+        earlyTriggerUpdater, lateTriggerUpdater);
+
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdaterField.setAccessible(true);
+    lateTriggerUpdaterField.setAccessible(true);
+
+    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+    assertEquals(timerTrigger, timerTriggerField.get(trigger));
+    assertEquals(lateTrigger, lateTriggerField.get(trigger));
+    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
new file mode 100644
index 0000000..268c9fc
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+  @Test public void testConstructor() {
+    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+    assertEquals(wndOutput.getKey(), "testMsg");
+    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+    assertFalse(wndOutput.isDelete());
+    assertEquals(wndOutput.getTimestamp(), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index 429573b..75de630 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -94,8 +94,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
   }
 
   @Override
-  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
-    Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout)
     throws InterruptedException {
 
     if (blockpollFlag) {

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 0e73e18..baf146a 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -134,7 +134,9 @@ public class TestCoordinatorStreamSystemConsumer {
       assertEquals(expectedSystemStreamPartition, systemStreamPartition);
     }
 
-    public int getRegisterCount() { return registerCount; }
+    public int getRegisterCount() {
+      return registerCount;
+    }
 
     public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
deleted file mode 100644
index b5e1028..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.Windows.Window;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators;
-import org.apache.samza.operators.api.internal.Operators.Operator;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to
- * directly program the stream processing stages that processes a stream and generate another one.
- *
- * @param <M>  Type of message in this stream
- */
-public class MessageStream<M extends Message> {
-
-  private final Set<Operator> subscribers = new HashSet<>();
-
-  /**
-   * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}.
-   *
-   * NOTE: This should only be used by implementation of {@link org.apache.samza.operators.impl.ChainedOperators}, not directly by programmers.
-   *
-   * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object
-   */
-  public Collection<Operator> getSubscribers() {
-    return Collections.unmodifiableSet(this.subscribers);
-  }
-
-  /**
-   * Public API methods start here
-   */
-
-  /**
-   * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value
-   *
-   * @param <A>  the type of input {@code a}
-   * @param <B>  the type of input {@code b}
-   * @param <C>  the type of input {@code c}
-   */
-  @FunctionalInterface
-  public interface VoidFunction3 <A, B, C> {
-    public void apply(A a, B b, C c);
-  }
-
-  /**
-   * Method to apply a map function (1:1) on a {@link MessageStream}
-   *
-   * @param mapper  the mapper function to map one input {@link Message} to one output {@link Message}
-   * @param <OM>  the type of the output {@link Message} in the output {@link MessageStream}
-   * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
-   */
-  public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
-    Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
-      OM r = mapper.apply(m);
-      if (r != null) {
-        this.add(r);
-      }
-    }});
-    this.subscribers.add(op);
-    return op.getOutputStream();
-  }
-
-  /**
-   * Method to apply a flatMap function (1:n) on a {@link MessageStream}
-   *
-   * @param flatMapper  the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s
-   * @param <OM>  the type of the output {@link Message} in the output {@link MessageStream}
-   * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
-   */
-  public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
-    Operator<OM> op = Operators.getStreamOperator(flatMapper);
-    this.subscribers.add(op);
-    return op.getOutputStream();
-  }
-
-  /**
-   * Method to apply a filter function on a {@link MessageStream}
-   *
-   * @param filter  the filter function to filter input {@link Message}s from the input {@link MessageStream}
-   * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
-   */
-  public MessageStream<M> filter(Function<M, Boolean> filter) {
-    Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
-      if (filter.apply(t)) {
-        this.add(t);
-      }
-    }});
-    this.subscribers.add(op);
-    return op.getOutputStream();
-  }
-
-  /**
-   * Method to send an input {@link MessageStream} to an output {@link SystemStream}, and allows the output {@link MessageStream}
-   * to be consumed by downstream stream operators again.
-   *
-   * @param sink  the user-defined sink function to send the input {@link Message}s to the external output systems
-   */
-  public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
-    this.subscribers.add(Operators.getSinkOperator(sink));
-  }
-
-  /**
-   * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream}
-   *
-   * @param window  the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream}
-   * @param <WK>  the type of key in the output {@link Message} from the {@link Window} function
-   * @param <WV>  the type of output value from
-   * @param <WS>  the type of window state kept in the {@link Window} function
-   * @param <WM>  the type of {@link WindowOutput} message from the {@link Window} function
-   * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
-   */
-  public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) {
-    Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window));
-    this.subscribers.add(wndOp);
-    return wndOp.getOutputStream();
-  }
-
-  /**
-   * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins.
-   *
-   * @param other  the other stream to be joined w/
-   * @param merger  the common function to merge messages from this {@link MessageStream} and {@code other}
-   * @param <K>  the type of join key
-   * @param <JM>  the type of message in the {@link Message} from the other join stream
-   * @param <RM>  the type of message in the {@link Message} from the join function
-   * @return the output {@link MessageStream} from the join function {@code joiner}
-   */
-  public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other,
-      BiFunction<M, JM, RM> merger) {
-    MessageStream<RM> outputStream = new MessageStream<>();
-
-    BiFunction<M, JM, RM> parJoin1 = merger::apply;
-    BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m);
-
-    // TODO: need to add default store functions for the two partial join functions
-
-    other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream));
-    this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream));
-    return outputStream;
-  }
-
-  /**
-   * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M}
-   *
-   * @param others  other streams to be merged w/ this one
-   * @return  the merged output stream
-   */
-  public MessageStream<M> merge(Collection<MessageStream<M>> others) {
-    MessageStream<M> outputStream = new MessageStream<>();
-
-    others.add(this);
-    others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream)));
-    return outputStream;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
deleted file mode 100644
index 59dd91c..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream}
- * from a specific input source.
- *
- */
-
-public final class MessageStreams {
-
-  /**
-   * private constructor to prevent instantiation
-   */
-  private MessageStreams() {}
-
-  /**
-   * private class for system input/output {@link MessageStream}
-   */
-  public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> {
-    /**
-     * The corresponding {@link org.apache.samza.system.SystemStream}
-     */
-    private final SystemStreamPartition ssp;
-
-    /**
-     * Constructor for input system stream
-     *
-     * @param ssp  the input {@link SystemStreamPartition} for the input {@link SystemMessageStream}
-     */
-    private SystemMessageStream(SystemStreamPartition ssp) {
-      this.ssp = ssp;
-    }
-
-    /**
-     * Getter for the {@link SystemStreamPartition} of the input
-     *
-     * @return the input {@link SystemStreamPartition}
-     */
-    public SystemStreamPartition getSystemStreamPartition() {
-      return this.ssp;
-    }
-  }
-
-  /**
-   * Public static API methods start here
-   */
-
-  /**
-   * Static API method to create a {@link MessageStream} from a system input stream
-   *
-   * @param ssp  the input {@link SystemStreamPartition}
-   * @return the {@link MessageStream} object takes {@code ssp} as the input
-   */
-  public static SystemMessageStream input(SystemStreamPartition ssp) {
-    return new SystemMessageStream(ssp);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java b/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
deleted file mode 100644
index fc3ea37..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Trigger;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a builder of {@link Trigger} object for a {@link Windows.Window}. The triggers are categorized into
- * three types:
- *
- * <p>
- *   early trigger: defines the condition when the first output from the window function is sent.
- *   late trigger: defines the condition when the updated output after the first output is sent.
- *   timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers
- * </p>
- *
- * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR).
- *
- * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.api.internal} to create triggers
- *
- *
- * @param <M>  the type of input {@link Message} to the {@link Windows.Window}
- * @param <V>  the type of output value from the {@link Windows.Window}
- */
-public final class TriggerBuilder<M extends Message, V> {
-
-  /**
-   * Predicate helper to OR multiple trigger conditions
-   */
-  static class PredicateHelper {
-    static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) {
-      return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
-    }
-
-    static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) {
-      return s -> lhs.apply(s) || rhs.apply(s);
-    }
-  }
-
-  /**
-   * The early trigger condition that determines the first output from the {@link Windows.Window}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
-
-  /**
-   * The late trigger condition that determines the late output(s) from the {@link Windows.Window}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
-
-  /**
-   * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward
-   */
-  private Function<WindowState<V>, Boolean> timerTrigger = null;
-
-  /**
-   * The state updater function to be applied after the first output is triggered
-   */
-  private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity();
-
-  /**
-   * The state updater function to be applied after the late output is triggered
-   */
-  private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity();
-
-  /**
-   * Helper method to add a trigger condition
-   *
-   * @param currentTrigger  current trigger condition
-   * @param newTrigger  new trigger condition
-   * @return  combined trigger condition that is {@code currentTrigger} OR {@code newTrigger}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger,
-      BiFunction<M, WindowState<V>, Boolean> newTrigger) {
-    if (currentTrigger == null) {
-      return newTrigger;
-    }
-
-    return PredicateHelper.or(currentTrigger, newTrigger);
-  }
-
-  /**
-   * Helper method to add a system timer trigger
-   *
-   * @param currentTimer  current timer condition
-   * @param newTimer  new timer condition
-   * @return  combined timer condition that is {@code currentTimer} OR {@code newTimer}
-   */
-  private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
-      Function<WindowState<V>, Boolean> newTimer) {
-    if (currentTimer == null) {
-      return newTimer;
-    }
-
-    return PredicateHelper.or(currentTimer, newTimer);
-  }
-
-  /**
-   * default constructor to prevent instantiation
-   */
-  private TriggerBuilder() {}
-
-  /**
-   * Constructor that set the size limit as the early trigger for a window
-   *
-   * @param sizeLimit  the number of messages in a window that would trigger the first output
-   */
-  private TriggerBuilder(long sizeLimit) {
-    this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
-  }
-
-  /**
-   * Constructor that set the event time length as the early trigger
-   *
-   * @param eventTimeFunction  the function that calculate the event time in nano-second from the input {@link Message}
-   * @param wndLenMs  the window length in event time in milli-second
-   */
-  private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
-    this.earlyTrigger = (m, s) ->
-        TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(),
-            eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs;
-  }
-
-  /**
-   * Constructor that set the special token message as the early trigger
-   *
-   * @param tokenFunc  the function that checks whether an input {@link Message} is a token message that triggers window output
-   */
-  private TriggerBuilder(Function<M, Boolean> tokenFunc) {
-    this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
-  }
-
-  /**
-   * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder}
-   * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
-   *
-   * @return  the final {@link Trigger} object
-   */
-  Trigger<M, WindowState<V>> build() {
-    return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
-  }
-
-  /**
-   * Public API methods start here
-   */
-
-
-  /**
-   * API method to allow users to set an update method to update the output value after the first window output is triggered
-   * by the early trigger condition
-   *
-   * @param onTriggerFunc  the method to update the output value after the early trigger
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
-    this.earlyTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; };
-    return this;
-  }
-
-  /**
-   * API method to allow users to set an update method to update the output value after a late window output is triggered
-   * by the late trigger condition
-   *
-   * @param onTriggerFunc  the method to update the output value after the late trigger
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
-    this.lateTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; };
-    return this;
-  }
-
-  /**
-   * API method to allow users to add a system timer trigger based on timeout after the last message received in the window
-   *
-   * @param timeoutMs  the timeout in ms after the last message received in the window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
-    this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
-        s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
-    return this;
-  }
-
-  /**
-   * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window
-   *
-   * @param timeoutMs  the timeout in ms after the first message received in the window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
-    this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
-        TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
-    return this;
-  }
-
-  /**
-   * API method allow users to add a late trigger based on the window size limit
-   *
-   * @param sizeLimit  limit on the number of messages in window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
-    this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit);
-    return this;
-  }
-
-  /**
-   * API method to allow users to define a customized late trigger function based on input message and the window state
-   *
-   * @param lateTrigger  the late trigger condition based on input {@link Message} and the current {@link WindowState}
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) {
-    this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
-    return this;
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit
-   *
-   * @param sizeLimit  window size limit
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) {
-    return new TriggerBuilder<M, V>(sizeLimit);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window
-   *
-   *
-   * @param eventTimeFunc  the function to get the event time from the input message
-   * @param eventTimeWndSizeMs  the event time window size in Ms
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) {
-    return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages
-   *
-   * @param tokenFunc  the function to determine whether an input message is a window token or not
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
-    return new TriggerBuilder<M, V>(tokenFunc);
-  }
-
-  /**
-   * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState}
-   *
-   * @param earlyTrigger  the user defined early trigger condition
-   * @param <M>   the input message type
-   * @param <V>   the output value from the window
-   * @return   the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
-    TriggerBuilder<M, V> newTriggers =  new TriggerBuilder<M, V>();
-    newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
-    return newTriggers;
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window
-   *
-   * @param timeoutMs  timeout in ms after the last message received
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) {
-    return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window
-   *
-   * @param timeoutMs  timeout in ms after the first message received
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) {
-    return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
deleted file mode 100644
index 402cc42..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-/**
- * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
- * customized window state to be stored in window state stores by implementing this interface class.
- *
- * @param <WV>  the type for window output value
- */
-public interface WindowState<WV> {
-  /**
-   * Method to get the system time when the first message in the window is received
-   *
-   * @return  nano-second of system time for the first message received in the window
-   */
-  long getFirstMessageTimeNs();
-
-  /**
-   * Method to get the system time when the last message in the window is received
-   *
-   * @return  nano-second of system time for the last message received in the window
-   */
-  long getLastMessageTimeNs();
-
-  /**
-   * Method to get the earliest event time in the window
-   *
-   * @return  the earliest event time in nano-second in the window
-   */
-  long getEarliestEventTimeNs();
-
-  /**
-   * Method to get the latest event time in the window
-   *
-   * @return  the latest event time in nano-second in the window
-   */
-  long getLatestEventTimeNs();
-
-  /**
-   * Method to get the total number of messages received in the window
-   *
-   * @return  number of messages in the window
-   */
-  long getNumberMessages();
-
-  /**
-   * Method to get the corresponding window's output value
-   *
-   * @return  the corresponding window's output value
-   */
-  WV getOutputValue();
-
-  /**
-   * Method to set the corresponding window's output value
-   *
-   * @param value  the corresponding window's output value
-   */
-  void setOutputValue(WV value);
-
-}