You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/21 20:42:24 UTC

samza git commit: SAMZA-1494; Flush operator state at end of stream

Repository: samza
Updated Branches:
  refs/heads/master f2c14fa63 -> 7a2e19250


SAMZA-1494; Flush operator state at end of stream

- Propagate operator messages at endOfStream to all down-stream operators.
- Emit all pending windows when endOfStream is reached.
- Flush all state on endOfStream irrespective of auto-commit behavior.

Author: Jagadish <jv...@linkedin.com>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #366 from vjagadish1989/eos-flush


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a2e1925
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a2e1925
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a2e1925

Branch: refs/heads/master
Commit: 7a2e19250f5a74f34eef1d53de5d52399850bc89
Parents: f2c14fa
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Nov 21 12:42:17 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 21 12:42:17 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/config/TaskConfigJava.java |   2 +-
 .../samza/operators/impl/OperatorImpl.java      |  13 +-
 .../operators/impl/PartitionByOperatorImpl.java |   3 +-
 .../operators/impl/WindowOperatorImpl.java      |  13 +
 .../org/apache/samza/config/TaskConfig.scala    |   5 +
 .../apache/samza/config/TestTaskConfigJava.java |  20 +
 .../samza/operators/TestWindowOperator.java     | 565 ----------------
 .../operators/impl/TestWindowOperator.java      | 677 +++++++++++++++++++
 8 files changed, 728 insertions(+), 570 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 0bf078e..04e15dc 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -116,7 +116,7 @@ public class TaskConfigJava extends MapConfig {
     Set<SystemStream> allInputSS = new HashSet<>();
 
     TaskConfig taskConfig = TaskConfig.Config2Task(this);
-    allInputSS.addAll(JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
+    allInputSS.addAll((Set<? extends SystemStream>) JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
     allInputSS.addAll(getBroadcastSystemStreams());
 
     return Collections.unmodifiableSet(allInputSS);

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 92a563a..862e5f9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -266,6 +266,7 @@ public abstract class OperatorImpl<M, RM> {
       if (eosStates.allEndOfStream()) {
         // all inputs have been end-of-stream, shut down the task
         LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
+        coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
         coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
       }
     }
@@ -279,7 +280,12 @@ public abstract class OperatorImpl<M, RM> {
    */
   private final void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
     if (inputStreams.stream().allMatch(input -> eosStates.isEndOfStream(input))) {
-      handleEndOfStream(collector, coordinator);
+      Collection<RM> results = handleEndOfStream(collector, coordinator);
+
+      results.forEach(rm ->
+          this.registeredOperators.forEach(op ->
+              op.onMessage(rm, collector, coordinator)));
+
       this.registeredOperators.forEach(op -> op.onEndOfStream(collector, coordinator));
     }
   }
@@ -291,9 +297,10 @@ public abstract class OperatorImpl<M, RM> {
    * override this to actually propagate EOS over the wire.
    * @param collector message collector
    * @param coordinator task coordinator
+   * @return results to be emitted when this operator reaches end-of-stream
    */
-  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
-    //Do nothing by default
+  protected Collection<RM> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+    return Collections.emptyList();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 3811da8..424c10f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -91,8 +91,9 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+  protected Collection<Void> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
     sendControlMessage(new EndOfStreamMessage(taskName), collector);
+    return Collections.emptyList();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 7031ff9..32406cb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -53,9 +53,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -195,6 +197,17 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
   }
 
   @Override
+  protected Collection<WindowPane<K, Object>> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+    List<WindowPane<K, Object>> results = new ArrayList<>();
+    Set<TriggerKey<K>> triggerKeys = new HashSet<>(triggers.keySet());
+    for(TriggerKey<K> triggerKey : triggerKeys) {
+      Optional<WindowPane<K, Object>> triggerResult = onTriggerFired(triggerKey, collector, coordinator);
+      triggerResult.ifPresent(results::add);
+    }
+    return results;
+  }
+
+  @Override
   protected void handleClose() {
     if (foldLeftFn != null) {
       foldLeftFn.close();

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 2ee0032..419e15b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -137,4 +137,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(asyncCommit) => Some(asyncCommit.toBoolean)
     case _ => None
   }
+
+  def isAutoCommitEnabled() = getOption(TaskConfig.COMMIT_MS) match {
+    case Some(commitMs) => commitMs.toInt > 0
+    case _ => true
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
index 878ca01..baf2d4f 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.Partition;
 import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Test;
@@ -70,4 +71,23 @@ public class TestTaskConfigJava {
     }
     assertTrue(invalidFormatException);
   }
+
+  @Test
+  public void testAutoCommitConfig() {
+    // positive values of commit.ms => autoCommit = true
+    Config config1 = new MapConfig(ImmutableMap.of("task.commit.ms", "1"));
+    assertTrue(new TaskConfig(config1).isAutoCommitEnabled());
+
+    // no value for commit.ms => autoCommit = true
+    Config config2 = new MapConfig(ImmutableMap.of());
+    assertTrue(new TaskConfig(config2).isAutoCommitEnabled());
+
+    // A zero value for commit.ms => autoCommit = false
+    Config config3 = new MapConfig(ImmutableMap.of("task.commit.ms", "0"));
+    assertFalse(new TaskConfig(config3).isAutoCommitEnabled());
+
+    // negative value for commit.ms => autoCommit = false
+    Config config4 = new MapConfig(ImmutableMap.of("task.commit.ms", "-1"));
+    assertFalse(new TaskConfig(config4).isAutoCommitEnabled());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
deleted file mode 100644
index 8abfb92..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import junit.framework.Assert;
-import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.impl.store.TestInMemoryStore;
-import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
-import org.apache.samza.operators.triggers.FiringType;
-import org.apache.samza.operators.triggers.Trigger;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.AccumulationMode;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.testUtils.TestClock;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestWindowOperator {
-  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
-  private Config config;
-  private TaskContextImpl taskContext;
-  private ApplicationRunner runner;
-
-  @Before
-  public void setup() throws Exception {
-    config = mock(Config.class);
-    when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
-    taskContext = mock(TaskContextImpl.class);
-    runner = mock(ApplicationRunner.class);
-    Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
-    Serde storeValSerde = new IntegerEnvelopeSerde();
-
-    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
-    when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
-    when(taskContext.getStore("jobName-jobId-window-w1"))
-        .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
-    when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
-  }
-
-  @Test
-  public void testTumblingWindowsDiscardingMode() throws Exception {
-
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
-        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
-    testClock.advanceTime(Duration.ofSeconds(1));
-
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 5);
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
-    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
-    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
-    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
-    Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
-  }
-
-  @Test
-  public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
-
-    StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
-        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    Assert.assertEquals(windowPanes.size(), 0);
-
-    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
-    Assert.assertEquals(windowPanes.size(), 0);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    Assert.assertEquals(windowPanes.size(), 0);
-
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 1);
-    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
-  }
-
-  @Test
-  public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
-
-    when(taskContext.getStore("jobName-jobId-window-w1"))
-        .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
-
-    StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
-        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
-    List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
-
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
-    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
-    testClock.advanceTime(Duration.ofSeconds(1));
-
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 5);
-    Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2));
-    Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2));
-    Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2));
-    Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2));
-    Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1));
-  }
-
-  @Test
-  public void testTumblingWindowsAccumulatingMode() throws Exception {
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
-        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 7);
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
-    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
-    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
-
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
-    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
-  }
-
-  @Test
-  public void testSessionWindowsDiscardingMode() throws Exception {
-    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
-    TestClock testClock = new TestClock();
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 1);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 3);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
-    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
-    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
-    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
-    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
-
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 4);
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
-    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
-
-  }
-
-  @Test
-  public void testSessionWindowsAccumulatingMode() throws Exception {
-    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
-        Duration.ofMillis(500));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    task.init(config, taskContext);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofSeconds(1));
-
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 2);
-    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
-    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
-  }
-
-  @Test
-  public void testCancellationOfOnceTrigger() throws Exception {
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
-        Duration.ofSeconds(1), Triggers.count(2));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 1);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 2);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
-    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
-    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
-
-    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 3);
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
-    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
-    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
-    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
-
-  }
-
-  @Test
-  public void testCancellationOfAnyTrigger() throws Exception {
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
-        Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    //assert that the count trigger fired
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-
-    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
-    //advance timer by 500 more millis to enable the default trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-    task.window(messageCollector, taskCoordinator);
-
-    //assert that the default trigger fired
-    Assert.assertEquals(windowPanes.size(), 2);
-    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
-    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
-    //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 3);
-    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
-
-    //advance timer by > 500 millis to enable the default trigger
-    testClock.advanceTime(Duration.ofMillis(900));
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 4);
-    Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
-  }
-
-  @Test
-  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
-
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
-        Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
-    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
-    MessageCollector messageCollector =
-        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
-    task.init(config, taskContext);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    //assert that the count trigger fired
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofMillis(500));
-    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 2);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 3);
-
-    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-    //advance timer by 500 more millis to enable the default trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-    task.window(messageCollector, taskCoordinator);
-    //assert that the default trigger fired
-    Assert.assertEquals(windowPanes.size(), 4);
-  }
-
-  private class KeyedTumblingWindowStreamApplication implements StreamApplication {
-
-    private final AccumulationMode mode;
-    private final Duration duration;
-    private final Trigger<IntegerEnvelope> earlyTrigger;
-    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
-    KeyedTumblingWindowStreamApplication(AccumulationMode mode,
-        Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
-      this.mode = mode;
-      this.duration = timeDuration;
-      this.earlyTrigger = earlyTrigger;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream =
-          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
-              .map(kv -> new IntegerEnvelope(kv.getKey()));
-      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
-      inStream
-          .map(m -> m)
-          .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
-              .setEarlyTrigger(earlyTrigger)
-              .setAccumulationMode(mode), "w1")
-          .sink((message, messageCollector, taskCoordinator) -> {
-              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
-            });
-    }
-  }
-
-  private class TumblingWindowStreamApplication implements StreamApplication {
-
-    private final AccumulationMode mode;
-    private final Duration duration;
-    private final Trigger<IntegerEnvelope> earlyTrigger;
-    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
-    TumblingWindowStreamApplication(AccumulationMode mode,
-                                         Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
-      this.mode = mode;
-      this.duration = timeDuration;
-      this.earlyTrigger = earlyTrigger;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream =
-          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
-              .map(kv -> new IntegerEnvelope(kv.getKey()));
-      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
-      inStream
-          .map(m -> m)
-          .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde())
-              .setEarlyTrigger(earlyTrigger)
-              .setAccumulationMode(mode), "w1")
-          .sink((message, messageCollector, taskCoordinator) -> {
-              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
-            });
-    }
-  }
-
-  private class AggregateTumblingWindowStreamApplication implements StreamApplication {
-
-    private final AccumulationMode mode;
-    private final Duration duration;
-    private final Trigger<IntegerEnvelope> earlyTrigger;
-    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
-    AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration,
-        Trigger<IntegerEnvelope> earlyTrigger) {
-      this.mode = mode;
-      this.duration = timeDuration;
-      this.earlyTrigger = earlyTrigger;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
-          KVSerde.of(new IntegerSerde(), new IntegerSerde()));
-
-      integers
-        .map(kv -> new IntegerEnvelope(kv.getKey()))
-        .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
-            .setEarlyTrigger(earlyTrigger)
-            .setAccumulationMode(mode), "w1")
-        .sink((message, messageCollector, taskCoordinator) -> {
-            messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
-          });
-    }
-  }
-
-  private class KeyedSessionWindowStreamApplication implements StreamApplication {
-
-    private final AccumulationMode mode;
-    private final Duration duration;
-    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
-    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
-      this.mode = mode;
-      this.duration = duration;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream =
-          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
-              .map(kv -> new IntegerEnvelope(kv.getKey()));
-      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
-
-      inStream
-          .map(m -> m)
-          .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
-              .setAccumulationMode(mode), "w1")
-          .sink((message, messageCollector, taskCoordinator) -> {
-              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
-            });
-    }
-  }
-
-  private class IntegerEnvelope extends IncomingMessageEnvelope  {
-
-    IntegerEnvelope(Integer key) {
-      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
-    }
-  }
-
-  private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> {
-    private final IntegerSerde intSerde = new IntegerSerde();
-
-    @Override
-    public byte[] toBytes(IntegerEnvelope object) {
-      return intSerde.toBytes((Integer) object.getKey());
-    }
-
-    @Override
-    public IntegerEnvelope fromBytes(byte[] bytes) {
-      return new IntegerEnvelope(intSerde.fromBytes(bytes));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
new file mode 100644
index 0000000..7d0c623
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.impl.store.TestInMemoryStore;
+import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.TestClock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class TestWindowOperator {
+  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+  private Config config;
+  private TaskContextImpl taskContext;
+  private ApplicationRunner runner;
+
+  @Before
+  public void setup() throws Exception {
+    config = mock(Config.class);
+    when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
+    taskContext = mock(TaskContextImpl.class);
+    runner = mock(ApplicationRunner.class);
+    Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
+    Serde storeValSerde = new IntegerEnvelopeSerde();
+
+    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+    when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+    when(taskContext.getStore("jobName-jobId-window-w1"))
+        .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
+    when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
+  }
+
+  @Test
+  public void testTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 5);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+    Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
+  }
+
+  @Test
+  public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
+  }
+
+  @Test
+  public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
+
+    when(taskContext.getStore("jobName-jobId-window-w1"))
+        .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
+
+    StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 5);
+    Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1));
+  }
+
+  @Test
+  public void testTumblingWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 7);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testSessionWindowsDiscardingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+  }
+
+  @Test
+  public void testSessionWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.init(config, taskContext);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testCancellationOfOnceTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+        Duration.ofSeconds(1), Triggers.count(2));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
+
+  }
+
+  @Test
+  public void testCancellationOfAnyTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+    //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+    //advance timer by > 500 millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(900));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+  }
+
+  @Test
+  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofMillis(500));
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 3);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 4);
+  }
+
+  @Test
+  public void testEndOfStreamFlushesWithEarlyTriggerFirings() throws Exception {
+    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
+        "integers", new Partition(0))), Collections.emptyMap());
+
+    when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
+    when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
+    when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+
+    StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    List<Integer> integerList = ImmutableList.of(1, 2, 1, 2, 1);
+    integerList.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+
+    // early triggers should emit (1,2) and (1,2) in the same window.
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(
+        new SystemStreamPartition("kafka", "integers", new Partition(0)));
+    task.process(endOfStream, messageCollector, taskCoordinator);
+
+    // end of stream flushes the last entry (1)
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+  }
+
+  @Test
+  public void testEndOfStreamFlushesWithDefaultTriggerFirings() throws Exception {
+    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
+        "integers", new Partition(0))), Collections.emptyMap());
+
+    when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
+    when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
+    when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    testClock.advanceTime(1000);
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+    final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(
+        new SystemStreamPartition("kafka", "integers", new Partition(0)));
+    task.process(endOfStream, messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getMessage().size(), 2);
+    verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+  }
+
+  @Test
+  public void testEndOfStreamFlushesWithNoTriggerFirings() throws Exception {
+    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
+        "integers", new Partition(0))), Collections.emptyMap());
+
+    when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
+    when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
+    when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+    final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(
+        new SystemStreamPartition("kafka", "integers", new Partition(0)));
+    task.process(endOfStream, messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getMessage().size(), 4);
+    verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+  }
+
+  private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+    KeyedTumblingWindowStreamApplication(AccumulationMode mode,
+        Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
+              .setEarlyTrigger(earlyTrigger)
+              .setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    }
+  }
+
+  private class TumblingWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+    TumblingWindowStreamApplication(AccumulationMode mode,
+                                         Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+      inStream
+          .map(m -> m)
+          .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde())
+              .setEarlyTrigger(earlyTrigger)
+              .setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    }
+  }
+
+  private class AggregateTumblingWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+    AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration,
+        Trigger<IntegerEnvelope> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
+          KVSerde.of(new IntegerSerde(), new IntegerSerde()));
+
+      integers
+        .map(kv -> new IntegerEnvelope(kv.getKey()))
+        .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
+            .setEarlyTrigger(earlyTrigger)
+            .setAccumulationMode(mode), "w1")
+        .sink((message, messageCollector, taskCoordinator) -> {
+            messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+          });
+    }
+  }
+
+  private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+      this.mode = mode;
+      this.duration = duration;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
+              .setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    }
+  }
+
+  private class IntegerEnvelope extends IncomingMessageEnvelope  {
+
+    IntegerEnvelope(Integer key) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
+    }
+  }
+
+  private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> {
+    private final IntegerSerde intSerde = new IntegerSerde();
+
+    @Override
+    public byte[] toBytes(IntegerEnvelope object) {
+      return intSerde.toBytes((Integer) object.getKey());
+    }
+
+    @Override
+    public IntegerEnvelope fromBytes(byte[] bytes) {
+      return new IntegerEnvelope(intSerde.fromBytes(bytes));
+    }
+  }
+}