You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/19 19:31:47 UTC

[1/2] samza git commit: SAMZA-1108: Implementation of Windows and various kinds of Triggers

Repository: samza
Updated Branches:
  refs/heads/master 05915bfc8 -> d399d6f3c


http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
new file mode 100644
index 0000000..8544efd
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeSinceLastMessageTrigger}
+ * @param <M> the type of the incoming message
+ */
+public class TimeSinceLastMessageTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TimeSinceLastMessageTriggerImpl.class);
+  private final TimeSinceLastMessageTrigger<M> trigger;
+  private final long durationMs;
+  private final Clock clock;
+  private final TriggerKey<WK> triggerKey;
+  private long callbackTime = Integer.MIN_VALUE;
+  private Cancellable cancellable = null;
+  private boolean shouldFire = false;
+
+  public TimeSinceLastMessageTriggerImpl(TimeSinceLastMessageTrigger<M> trigger, Clock clock, TriggerKey<WK> key) {
+    this.trigger = trigger;
+    this.durationMs = trigger.getDuration().toMillis();
+    this.clock = clock;
+    this.triggerKey = key;
+  }
+
+  @Override
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    if (!shouldFire) {
+      long currTime = clock.currentTimeMillis();
+
+      if (currTime < callbackTime && cancellable != null) {
+        cancellable.cancel();
+      }
+
+      callbackTime = currTime + durationMs;
+      Runnable runnable = () -> {
+        LOG.trace("Time since last message trigger fired");
+        shouldFire = true;
+      };
+
+      cancellable = context.scheduleCallback(runnable, callbackTime, triggerKey);
+    }
+  }
+
+  @Override
+  public void cancel() {
+    if (cancellable != null) {
+      cancellable.cancel();
+    }
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return shouldFire;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
new file mode 100644
index 0000000..2454ce9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeTrigger}
+ */
+public class TimeTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TimeTriggerImpl.class);
+
+  private final TimeTrigger<M> trigger;
+  private final TriggerKey<WK> triggerKey;
+  private Cancellable cancellable;
+  private final Clock clock;
+  private boolean shouldFire = false;
+
+  public TimeTriggerImpl(TimeTrigger<M> trigger, Clock clock, TriggerKey<WK> key) {
+    this.trigger = trigger;
+    this.clock = clock;
+    this.triggerKey = key;
+  }
+
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    final long now = clock.currentTimeMillis();
+    long triggerDurationMs = trigger.getDuration().toMillis();
+    Long callbackTime = (now - now % triggerDurationMs) + triggerDurationMs;
+
+    if (cancellable == null) {
+      cancellable = context.scheduleCallback(() -> {
+          LOG.trace("Time trigger fired");
+          shouldFire = true;
+        }, callbackTime, triggerKey);
+    }
+  }
+
+  @Override
+  public void cancel() {
+    if (cancellable != null) {
+      cancellable.cancel();
+    }
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return shouldFire;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
new file mode 100644
index 0000000..705cab7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import org.apache.samza.operators.impl.TriggerScheduler;
+
+/**
+ * Implementation class for a {@link Trigger}. A {@link TriggerImpl} is used with a
+ * which is invoked when the trigger fires.
+ *
+ * <p> When messages arrive in the {@code WindowOperatorImpl}, they are assigned to one or more windows. An
+ * instance of a {@link TriggerImpl} is created corresponding to each {@link Trigger} configured for a window. For every
+ * message added to the window, the {@code WindowOperatorImpl} invokes the {@link #onMessage} on its corresponding
+ * {@link TriggerImpl}s. A {@link TriggerImpl} instance is scoped to a window and its firing determines when results for
+ * its window are emitted.
+ *
+ * {@link TriggerImpl}s can use the {@link TriggerScheduler} to schedule and cancel callbacks (for example, implementations
+ * of time-based triggers).
+ *
+ * <p> State management: The state maintained by {@link TriggerImpl}s is not durable across re-starts and is transient.
+ * New instances of {@link TriggerImpl} are created on a re-start.
+ *
+ */
+public interface TriggerImpl<M, WK> {
+
+  /**
+   * Invoked when a message is added to the window corresponding to this {@link TriggerImpl}.
+   * @param message the incoming message
+   * @param context the {@link TriggerScheduler} to schedule and cancel callbacks
+   */
+  public void onMessage(M message, TriggerScheduler<WK> context);
+
+  /**
+   * Returns {@code true} if the current state of the trigger indicates that its condition
+   * is satisfied and it is ready to fire.
+   * @return if this trigger should fire.
+   */
+  public boolean shouldFire();
+
+  /**
+   * Invoked when the execution of this {@link TriggerImpl} is canceled by an up-stream {@link TriggerImpl}.
+   *
+   * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link #shouldFire()} will be invoked
+   * after this invocation.
+   */
+  public void cancel();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
new file mode 100644
index 0000000..f64a1db
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.util.Clock;
+
+/**
+ * Factory methods for instantiating {@link TriggerImpl}s from individual {@link Trigger}s.
+ */
+public class TriggerImpls {
+
+  public static <M, WK> TriggerImpl<M, WK> createTriggerImpl(Trigger<M> trigger, Clock clock, TriggerKey<WK> triggerKey) {
+
+    if (trigger == null) {
+      throw new IllegalArgumentException("Trigger must not be null");
+    }
+
+    if (trigger instanceof CountTrigger) {
+      return new CountTriggerImpl<>((CountTrigger<M>) trigger, triggerKey);
+    } else if (trigger instanceof RepeatingTrigger) {
+      return new RepeatingTriggerImpl<>((RepeatingTrigger<M>) trigger, clock, triggerKey);
+    } else if (trigger instanceof AnyTrigger) {
+      return new AnyTriggerImpl<>((AnyTrigger<M>) trigger, clock, triggerKey);
+    } else if (trigger instanceof TimeSinceLastMessageTrigger) {
+      return new TimeSinceLastMessageTriggerImpl<>((TimeSinceLastMessageTrigger<M>) trigger, clock, triggerKey);
+    } else if (trigger instanceof TimeTrigger) {
+      return new TimeTriggerImpl((TimeTrigger<M>) trigger, clock, triggerKey);
+    } else if (trigger instanceof TimeSinceFirstMessageTrigger) {
+      return new TimeSinceFirstMessageTriggerImpl<>((TimeSinceFirstMessageTrigger<M>) trigger, clock, triggerKey);
+    }
+
+    throw new SamzaException("No implementation class defined for the trigger  " + trigger.getClass().getCanonicalName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index e5dab80..b8672c6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.operators.util;
 
 import org.apache.samza.storage.kv.Entry;
@@ -30,24 +31,28 @@ import java.util.Map;
 
 /**
  * Implements a {@link KeyValueStore} using an in-memory Java Map.
- * @param <K>  the type of the key in the store
- * @param <V>  the type of the value in the store
+ * @param <K>  the type of key
+ * @param <V>  the type of value
+ *
+ * TODO: This class is a stop-gap until we implement persistent store creation from TaskContext.
  *
- * TODO HIGH prateekm: Remove when we switch to an persistent implementation for KeyValueStore API.
  */
 public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
 
-  final Map<K, V> map = new LinkedHashMap<>();
+  private final Map<K, V> map = new LinkedHashMap<>();
 
   @Override
   public V get(K key) {
+    if (key == null) {
+      throw new NullPointerException("Null key provided");
+    }
     return map.get(key);
   }
 
   @Override
   public Map<K, V> getAll(List<K> keys) {
     Map<K, V> values = new HashMap<>();
-    for (K key: keys) {
+    for (K key : keys) {
       values.put(key, map.get(key));
     }
     return values;
@@ -55,18 +60,24 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
 
   @Override
   public void put(K key, V value) {
+    if (key == null) {
+      throw new NullPointerException("Null key provided");
+    }
     map.put(key, value);
   }
 
   @Override
   public void putAll(List<Entry<K, V>> entries) {
-    for (Entry<K, V> entry: entries) {
+    for (Entry<K, V> entry : entries) {
       put(entry.getKey(), entry.getValue());
     }
   }
 
   @Override
   public void delete(K key) {
+    if (key == null) {
+      throw new NullPointerException("Null key provided");
+    }
     map.remove(key);
   }
 
@@ -119,4 +130,4 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
   public void flush() {
     //not applicable
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 9ec8e5a..d4224c3 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -20,6 +20,8 @@ package org.apache.samza.task;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.MessageStreamImpl;
@@ -30,6 +32,9 @@ import org.apache.samza.operators.impl.OperatorGraph;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
 
 
 /**
@@ -65,16 +70,27 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
   /**
    * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
    */
-  private final OperatorGraph operatorGraph = new OperatorGraph();
+  private final OperatorGraph operatorGraph;
 
   private final StreamApplication graphBuilder;
 
   private final ApplicationRunner runner;
 
+  private final Clock clock;
+
   private ContextManager contextManager;
 
+  private Set<SystemStreamPartition> systemStreamPartitions;
+
   public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) {
+    this(graphBuilder, SystemClock.instance(), runner);
+  }
+
+  // purely for testing.
+  public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) {
     this.graphBuilder = graphBuilder;
+    this.operatorGraph = new OperatorGraph(clock);
+    this.clock = clock;
     this.runner = runner;
   }
 
@@ -85,9 +101,10 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     this.graphBuilder.init(streamGraph, config);
     // get the context manager of the {@link StreamGraph} and initialize the task-specific context
     this.contextManager = streamGraph.getContextManager();
+    this.systemStreamPartitions = context.getSystemStreamPartitions();
 
     Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
-    context.getSystemStreamPartitions().forEach(ssp -> {
+    systemStreamPartitions.forEach(ssp -> {
         if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
           // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
           inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream()));
@@ -103,8 +120,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
   }
 
   @Override
-  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    this.operatorGraph.getAll().forEach(r -> r.onTimer(collector, coordinator));
+  public final void window(MessageCollector collector, TaskCoordinator coordinator)  {
+    systemStreamPartitions.forEach(ssp -> {
+        this.operatorGraph.get(ssp.getSystemStream())
+          .onTick(collector, coordinator);
+      });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 543716a..6edf048 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -33,6 +33,7 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
+import java.util.function.Supplier;
 
 
 /**
@@ -44,9 +45,10 @@ public class PageViewCounterExample implements StreamApplication {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+    Supplier<Integer> initialValue = () -> 0;
 
     pageViewEvents.
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1).
             setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
             setAccumulationMode(AccumulationMode.DISCARDING)).
         map(MyStreamOutput::new).

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 729b26f..e222fe4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -31,6 +31,7 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
+import java.util.function.Supplier;
 
 
 /**
@@ -67,11 +68,12 @@ public class RepartitionExample implements StreamApplication {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+    Supplier<Integer> initialValue = () -> 0;
 
     pageViewEvents.
         partitionBy(m -> m.getMessage().memberId).
         window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
-            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)).
         map(MyStreamOutput::new).
         sendTo(pageViewPerMemberCounters);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index d988270..1c30a21 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -21,13 +21,15 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.StreamSpec;
@@ -65,18 +67,20 @@ public class TestBroadcastExample extends TestExampleBase {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+    FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1;
+    Supplier<Integer> initialValue = () -> 0;
+
     inputs.keySet().forEach(entry -> {
         MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
                 new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
 
-        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
 
-        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
 
-        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
 
       });

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index 6896da5..c88df7c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -25,13 +25,14 @@ import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.time.Duration;
-import java.util.function.BiFunction;
 import java.util.Set;
+import java.util.function.Supplier;
 
 
 /**
@@ -57,11 +58,12 @@ public class TestWindowExample extends TestExampleBase {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+    FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1;
+    Supplier<Integer> initialValue = () -> 0;
     inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
             new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
         map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
-            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator)));
 
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 361972e..5722dbd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -50,7 +50,12 @@ public class TestOperatorImpl {
         TestOperatorImpl.this.curCollector = collector;
         TestOperatorImpl.this.curCoordinator = coordinator;
       }
-    };
+      @Override
+      public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+
+      }
+
+      };
     // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
     OperatorImpl mockSub = mock(OperatorImpl.class);
     opImpl.registerNextOperator(mockSub);

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 088cb00..31f6f4a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -38,6 +38,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.task.TaskContext;
 import org.junit.Before;
 import org.junit.Test;
@@ -77,7 +78,7 @@ public class TestOperatorImpls {
   public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
     // get window operator
     WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
-    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
+    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
     when(mockWnd.getWindow()).thenReturn(windowInternal);
     MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
     Config mockConfig = mock(Config.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ae3d151..ec1d74c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -25,18 +25,20 @@ import org.apache.samza.operators.TestMessageStreamImplUtil;
 import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -69,17 +71,17 @@ public class TestOperatorSpecs {
   @Test
   public void testGetWindowOperator() throws Exception {
     Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
-    BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1;
-
+    FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
+    Supplier<Integer> initialValue = () -> 0;
     //instantiate a window using reflection
-    WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
+    WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
 
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
     WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
     assertEquals(spec.getWindow(), window);
     assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
-    assertEquals(spec.getWindow().getFoldFunction(), aggregator);
+    assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
new file mode 100644
index 0000000..674a8f1
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.util.Clock;
+
+import java.time.Duration;
+
+/**
+ * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration.
+ * Used for testing.
+ */
+public class TestClock implements Clock {
+
+  long currentTime = 1;
+
+  public void advanceTime(Duration duration) {
+    currentTime += duration.toMillis();
+  }
+
+  public void advanceTime(long millis) {
+    currentTime += millis;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
new file mode 100644
index 0000000..0d720dd
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestWindowOperator {
+  private final MessageCollector messageCollector = mock(MessageCollector.class);
+  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
+  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+  private Config config;
+  private TaskContext taskContext;
+  private ApplicationRunner runner;
+
+  @Before
+  public void setup() throws Exception {
+    windowPanes.clear();
+
+    config = mock(Config.class);
+    taskContext = mock(TaskContext.class);
+    runner = mock(ApplicationRunner.class);
+    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+
+  }
+
+  @Test
+  public void testTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 5);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(4).getMessage()).size(), 1);
+  }
+
+  @Test
+  public void testTumblingWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 7);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 4);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testSessionWindowsDiscardingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2);
+
+  }
+
+  @Test
+  public void testSessionWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testCancelationOfOnceTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 1);
+
+  }
+
+  @Test
+  public void testCancelationOfAnyTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 5);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+    //advance timer by > 500 millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(900));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+  }
+
+  @Test
+  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofMillis(500));
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 3);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 4);
+  }
+
+  private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+    private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka");
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+
+    KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null);
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+      inStream
+        .map(m -> m)
+        .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
+          .setAccumulationMode(mode))
+        .map(m -> {
+            windowPanes.add(m);
+            return m;
+          });
+    }
+  }
+
+  private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+    private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka");
+    private final AccumulationMode mode;
+    private final Duration duration;
+
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+      this.mode = mode;
+      this.duration = duration;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null);
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration)
+              .setAccumulationMode(mode))
+          .map(m -> {
+              windowPanes.add(m);
+              return m;
+            });
+    }
+  }
+
+  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
+    IntegerMessageEnvelope(int key, int msg) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
+    }
+  }
+}


[2/2] samza git commit: SAMZA-1108: Implementation of Windows and various kinds of Triggers

Posted by ja...@apache.org.
SAMZA-1108: Implementation of Windows and various kinds of Triggers

* Implemented various triggers and the orchestration logic of the window operator.
* Implemented wire-up of window and the flow of messages through various trigger implementations.
* Implementations for count, time, timeSinceFirst, timeSinceLast, Any, Repeating triggers.

Author: vjagadish1989 <jv...@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>, Prateek Maheshwari <pm...@linkedin.com>, Chris Pettitt <cp...@linkedin.com>

Closes #66 from vjagadish1989/window-impl


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d399d6f3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d399d6f3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d399d6f3

Branch: refs/heads/master
Commit: d399d6f3ca0fa20afcfad5864ec7f8550aa7a00f
Parents: 05915bf
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Sun Mar 19 12:25:40 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Sun Mar 19 12:31:21 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   7 +-
 .../operators/functions/FilterFunction.java     |   2 +-
 .../operators/functions/FoldLeftFunction.java   |  36 ++
 .../samza/operators/functions/MapFunction.java  |   5 +-
 .../samza/operators/triggers/AnyTrigger.java    |   9 +-
 .../samza/operators/triggers/CountTrigger.java  |   2 +-
 .../samza/operators/triggers/FiringType.java    |  29 ++
 .../operators/triggers/RepeatingTrigger.java    |   4 +
 .../triggers/TimeSinceFirstMessageTrigger.java  |   2 +-
 .../triggers/TimeSinceLastMessageTrigger.java   |   3 +-
 .../samza/operators/triggers/TimeTrigger.java   |   2 +-
 .../samza/operators/windows/WindowKey.java      |  36 +-
 .../samza/operators/windows/WindowPane.java     |  14 +-
 .../apache/samza/operators/windows/Windows.java | 153 ++------
 .../windows/internal/WindowInternal.java        |  65 +++-
 .../operators/windows/internal/WindowType.java  |  24 ++
 .../samza/operators/windows/TestWindowPane.java |   3 +-
 .../apache/samza/operators/StreamGraphImpl.java |   1 +
 .../org/apache/samza/operators/WindowState.java |  44 +++
 .../samza/operators/impl/OperatorGraph.java     |  16 +-
 .../samza/operators/impl/OperatorImpl.java      |  30 +-
 .../operators/impl/PartialJoinOperatorImpl.java |   1 -
 .../apache/samza/operators/impl/TriggerKey.java |  73 ++++
 .../samza/operators/impl/TriggerScheduler.java  | 120 ++++++
 .../operators/impl/WindowOperatorImpl.java      | 290 +++++++++++++-
 .../operators/spec/WindowOperatorSpec.java      |  11 +-
 .../operators/triggers/AnyTriggerImpl.java      |  80 ++++
 .../samza/operators/triggers/Cancellable.java   |  34 ++
 .../operators/triggers/CountTriggerImpl.java    |  61 +++
 .../triggers/RepeatingTriggerImpl.java          |  67 ++++
 .../TimeSinceFirstMessageTriggerImpl.java       |  71 ++++
 .../TimeSinceLastMessageTriggerImpl.java        |  79 ++++
 .../operators/triggers/TimeTriggerImpl.java     |  71 ++++
 .../samza/operators/triggers/TriggerImpl.java   |  66 ++++
 .../samza/operators/triggers/TriggerImpls.java  |  53 +++
 .../operators/util/InternalInMemoryStore.java   |  25 +-
 .../apache/samza/task/StreamOperatorTask.java   |  28 +-
 .../samza/example/PageViewCounterExample.java   |   4 +-
 .../samza/example/RepartitionExample.java       |   4 +-
 .../samza/example/TestBroadcastExample.java     |  14 +-
 .../apache/samza/example/TestWindowExample.java |   8 +-
 .../samza/operators/impl/TestOperatorImpl.java  |   7 +-
 .../samza/operators/impl/TestOperatorImpls.java |   3 +-
 .../samza/operators/spec/TestOperatorSpecs.java |  12 +-
 .../samza/operators/triggers/TestClock.java     |  45 +++
 .../operators/triggers/TestWindowOperator.java  | 389 +++++++++++++++++++
 46 files changed, 1885 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 0999fd7..775d674 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -20,7 +20,9 @@
 --> 
 <module name="Checker">
   <property name="localeLanguage" value="en"/>
-  
+  <!-- allow suppression for specific files -->
+  <module name="SuppressionCommentFilter"/>
+
   <module name="FileTabCharacter"/>
   
   <!-- header: use one star only -->
@@ -32,6 +34,7 @@
     
     <!-- code cleanup -->
     <module name="UnusedImports"/>
+    <module name="FileContentsHolder"/>
     <module name="RedundantImport"/>
     <module name="IllegalImport" />
     <module name="EqualsHashCode"/>
@@ -62,8 +65,8 @@
     <!-- whitespace -->
     <module name="GenericWhitespace"/>
     <module name="NoWhitespaceBefore"/>
-    <module name="WhitespaceAfter" />
     <module name="NoWhitespaceAfter"/>
+    <module name="WhitespaceAfter" />
     <module name="WhitespaceAround">
       <property name="allowEmptyConstructors" value="true"/>
       <property name="allowEmptyMethods" value="true"/>

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index 58479d6..143bae0 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -31,7 +31,7 @@ public interface FilterFunction<M> extends InitableFunction {
 
   /**
    * Returns a boolean indicating whether this message should be retained or filtered out.
-   * @param message  the input message to be checked
+   * @param message  the input message to be checked. This object should not be mutated.
    * @return  true if {@code message} should be retained
    */
   boolean apply(M message);

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
new file mode 100644
index 0000000..58e88fd
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+/**
+ * A fold function that incrementally combines and aggregates values for a window.
+ */
+public interface FoldLeftFunction<M, WV> extends InitableFunction {
+
+  /**
+   * Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every
+   * message added to the window.
+   *
+   * @param message the incoming message that is added to the window. This object should not be mutated.
+   * @param oldValue the previous value
+   * @return the new value
+   */
+  WV apply(M message, WV oldValue);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index 05a554f..b09fb99 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -31,8 +31,9 @@ import org.apache.samza.annotation.InterfaceStability;
 public interface MapFunction<M, OM>  extends InitableFunction {
 
   /**
-   * Transforms the provided message into another message
-   * @param message  the input message to be transformed
+   * Transforms the provided message into another message.
+   *
+   * @param message  the input message to be transformed. This object should not be mutated.
    * @return  the transformed message
    */
   OM apply(M message);

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index 6e134df..f52b57b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -23,16 +23,15 @@ import java.util.List;
 /**
  * A {@link Trigger} fires as soon as any of its individual triggers has fired.
  */
-public class AnyTrigger<M> implements Trigger {
+public class AnyTrigger<M> implements Trigger<M> {
 
-  private final List<Trigger> triggers;
+  private final List<Trigger<M>> triggers;
 
-  AnyTrigger(List<Trigger> triggers) {
+  AnyTrigger(List<Trigger<M>> triggers) {
     this.triggers = triggers;
   }
 
-  public List<Trigger> getTriggers() {
+  public List<Trigger<M>> getTriggers() {
     return triggers;
   }
 }
-

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
index 1cf930c..dbae3a9 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
@@ -22,7 +22,7 @@ package org.apache.samza.operators.triggers;
  * A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane}
  * reaches the specified count.
  */
-public class CountTrigger<M> implements Trigger {
+public class CountTrigger<M> implements Trigger<M> {
 
   private final long count;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java
new file mode 100644
index 0000000..49d971d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+/**
+ * The type of the {@link org.apache.samza.operators.triggers.Trigger} firing.
+ * Firings can be either early or late or default. Late triggers are not supported currently.
+ */
+public enum FiringType {
+  EARLY,
+  DEFAULT,
+  LATE
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
index 7f78eb8..166d0d9 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -28,5 +28,9 @@ class RepeatingTrigger<M> implements Trigger<M> {
   RepeatingTrigger(Trigger<M> trigger) {
     this.trigger = trigger;
   }
+
+  public Trigger<M> getTrigger() {
+    return trigger;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
index 4de60a2..94b7769 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -26,7 +26,7 @@ import java.time.Duration;
  * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
  * the window pane.
  */
-public class TimeSinceFirstMessageTrigger<M> implements Trigger {
+public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
index 6b09625..2231fd4 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -22,8 +22,9 @@ import java.time.Duration;
 
 /*
  * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
+ * @param <M> the type of the incoming {@link MessageEnvelope}
  */
-public class TimeSinceLastMessageTrigger<M> implements Trigger {
+public class TimeSinceLastMessageTrigger<M> implements Trigger<M> {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
index c5875aa..d854d74 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -23,7 +23,7 @@ import java.time.Duration;
 /*
  * A {@link Trigger} that fires after the specified duration in processing time.
  */
-public class TimeTrigger<M> implements Trigger {
+public class TimeTrigger<M> implements Trigger<M> {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index 14bd5ab..bf52724 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -26,14 +26,19 @@ package org.apache.samza.operators.windows;
  *
  */
 public class WindowKey<K> {
-
+  /**
+   * A (key,paneId) tuple uniquely identifies an emission from a window. For instance, in case of keyed-tumbling time windows,
+   * the key is provided by the keyExtractor function, and the paneId is the start of the time window boundary. In case
+   * of session windows, the key is provided by the keyExtractor function, and the paneId is the time at which the earliest
+   * message in the window arrived.
+   */
   private final  K key;
 
   private final String paneId;
 
-  public WindowKey(K key, String  windowId) {
+  public WindowKey(K key, String  paneId) {
     this.key = key;
-    this.paneId = windowId;
+    this.paneId = paneId;
   }
 
   public K getKey() {
@@ -52,4 +57,29 @@ public class WindowKey<K> {
     }
     return String.format("%s%s", wndKey, paneId);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    WindowKey<?> windowKey = (WindowKey<?>) o;
+
+    if (!key.equals(windowKey.key)) return false;
+
+    if (paneId == null) {
+      return windowKey.paneId == null;
+    }
+
+    return paneId.equals(windowKey.paneId);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = key.hashCode();
+    result = 31 * result + (paneId != null ? paneId.hashCode() : 0);
+    return result;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
index 3b66bd1..3b19f8a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.operators.windows;
 
+import org.apache.samza.operators.triggers.FiringType;
+
 /**
  * Specifies the result emitted from a {@link Window}.
  *
@@ -32,10 +34,16 @@ public final class WindowPane<K, V> {
 
   private final AccumulationMode mode;
 
-  WindowPane(WindowKey<K> key, V value, AccumulationMode mode) {
+  /**
+   * The type of the trigger that emitted this result. Results can be emitted from early, late or default triggers.
+   */
+  private final FiringType type;
+
+  public WindowPane(WindowKey<K> key, V value, AccumulationMode mode, FiringType type) {
     this.key = key;
     this.value = value;
     this.mode = mode;
+    this.type = type;
   }
 
   public V getMessage() {
@@ -46,8 +54,8 @@ public final class WindowPane<K, V> {
     return this.key;
   }
 
-  static public <K, M> WindowPane<K, M> of(WindowKey<K> key, M result) {
-    return new WindowPane<>(key, result, AccumulationMode.DISCARDING);
+  public FiringType getFiringType() {
+    return type;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 73fb5c8..9192fc1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -20,15 +20,18 @@
 package org.apache.samza.operators.windows;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.TimeTrigger;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * APIs for creating different types of {@link Window}s.
@@ -84,6 +87,8 @@ import java.util.function.Function;
  * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
  * types.
  *
+ * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
+ * finer granularity are not supported.
  */
 @InterfaceStability.Unstable
 public final class Windows {
@@ -107,17 +112,18 @@ public final class Windows {
    *
    * @param keyFn the function to extract the window key from a message
    * @param interval the duration in processing time
+   * @param initialValue the initial value to be used for aggregations
    * @param foldFn the function to aggregate messages in the {@link WindowPane}
    * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function.
    */
-  public static <M, K, WV> Window<M, K, WV>
-    keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) {
+  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> keyFn, Duration interval,
+                                                                Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) {
 
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
-    return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
+    return new WindowInternal<M, K, WV>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.TUMBLING);
   }
 
 
@@ -142,11 +148,10 @@ public final class Windows {
    * @return the created {@link Window} function
    */
   public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
-    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
-      c.add(m);
-      return c;
-    };
-    return keyedTumblingWindow(keyFn, interval, aggregator);
+    FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
+
+    Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
+    return keyedTumblingWindow(keyFn, interval, initialValue, aggregator);
   }
 
   /**
@@ -164,15 +169,16 @@ public final class Windows {
    * </pre>
    *
    * @param duration the duration in processing time
+   * @param initialValue the initial value to be used for aggregations
    * @param foldFn to aggregate messages in the {@link WindowPane}
    * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @return the created {@link Window} function
    */
-  public static <M, WV> Window<M, Void, WV>
-    tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) {
+  public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<WV> initialValue,
+                                                           FoldLeftFunction<M, WV> foldFn) {
     Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
-    return new WindowInternal<>(defaultTrigger, foldFn, null, null);
+    return new WindowInternal<>(defaultTrigger, initialValue, foldFn, null, null, WindowType.TUMBLING);
   }
 
   /**
@@ -195,11 +201,10 @@ public final class Windows {
    * @return the created {@link Window} function
    */
   public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
-    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
-      c.add(m);
-      return c;
-    };
-    return tumblingWindow(duration, aggregator);
+    FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
+
+    Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
+    return tumblingWindow(duration, initialValue, aggregator);
   }
 
   /**
@@ -223,15 +228,17 @@ public final class Windows {
    *
    * @param keyFn the function to extract the window key from a message
    * @param sessionGap the timeout gap for defining the session
+   * @param initialValue the initial value to be used for aggregations
    * @param foldFn the function to aggregate messages in the {@link WindowPane}
    * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @param <WV> the type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function
    */
-  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) {
+  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap,
+                                                               Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) {
     Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
-    return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null);
+    return new WindowInternal<>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.SESSION);
   }
 
   /**
@@ -260,114 +267,18 @@ public final class Windows {
    */
   public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
 
-    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
-      c.add(m);
-      return c;
-    };
-    return keyedSessionWindow(keyFn, sessionGap, aggregator);
-  }
-
+    FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
 
-  /**
-   * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
-   * default trigger. The triggering behavior must be specified by setting an early trigger.
-   *
-   * <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when
-   * there are either 50 messages in the window pane or when 10 seconds have passed since the first message in the pane.
-   *
-   * <pre> {@code
-   *    MessageStream<Long> stream = ...;
-   *    BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c);
-   *    MessageStream<WindowPane<Void, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
-   *      .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
-   * }
-   * </pre>
-   *
-   * @param foldFn the function to aggregate messages in the {@link WindowPane}
-   * @param <M> the type of message
-   * @param <WV> type of the output value in the {@link WindowPane}
-   * @return the created {@link Window} function.
-   */
-  public static <M, WV> Window<M, Void, WV> globalWindow(BiFunction<M, WV, WV> foldFn) {
-    return new WindowInternal<>(null, foldFn, null, null);
+    Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
+    return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator);
   }
 
-  /**
-   * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
-   * default trigger. The triggering behavior must be specified by setting an early trigger.
-   *
-   * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes.
-   * <pre> {@code
-   *    MessageStream<Long> stream = ...;
-   *    MessageStream<WindowPane<Void, Collection<Long>> windowedStream = stream.window(Windows.globalWindow()
-   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
-   * }
-   * </pre>
-   *
-   * @param <M> the type of message
-   * @return the created {@link Window} function.
-   */
-  public static <M> Window<M, Void, Collection<M>> globalWindow() {
-    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
-      c.add(m);
-      return c;
-    };
-    return globalWindow(aggregator);
-  }
 
-  /**
-   * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
-   * The window does not have a default trigger. The triggering behavior must be specified by setting an early
-   * trigger.
-   *
-   * <p> The below example groups the stream into count based windows. The window triggers every 50 messages or every
-   * 10 minutes.
-   *
-   * <pre> {@code
-   *    MessageStream<UserClick> stream = ...;
-   *    BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c);
-   *    Function<UserClick, String> keyFn = ...;
-   *    MessageStream<WindowPane<String, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
-   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
-   * }
-   * </pre>
-   *
-   * @param keyFn the function to extract the window key from a message
-   * @param foldFn the function to aggregate messages in the {@link WindowPane}
-   * @param <M> the type of message
-   * @param <K> type of the key in the {@link Window}
-   * @param <WV> the type of the output value in the {@link WindowPane}
-   * @return the created {@link Window} function
-   */
-  public static <M, K, WV> Window<M, K, WV> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) {
-    return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null);
-  }
-
-  /**
-   * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
-   * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger.
-   *
-   * <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or
-   * every 10 minutes.
-   *
-   * <pre> {@code
-   *    MessageStream<UserClick> stream = ...;
-   *    Function<UserClick, String> keyFn = ...;
-   *    MessageStream<WindowPane<String, Collection<UserClick>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
-   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
-   * }
-   * </pre>
-   *
-   * @param keyFn the function to extract the window key from a message
-   * @param <M> the type of message
-   * @param <K> the type of the key in the {@link Window}
-   * @return the created {@link Window} function
-   */
-  public static <M, K> Window<M, K, Collection<M>> keyedGlobalWindow(Function<M, K> keyFn) {
-    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+  private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() {
+    return (m, c) -> {
       c.add(m);
       return c;
     };
-    return keyedGlobalWindow(keyFn, aggregator);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index 9479eea..f6ac301 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -18,12 +18,13 @@
  */
 package org.apache.samza.operators.windows.internal;
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.Window;
 
-import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  *  Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window}
@@ -32,81 +33,105 @@ import java.util.function.Function;
  *  Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers.
  *
  * @param <M>  the type of input message
- * @param <K>  the type of key for the window
+ * @param <WK>  the type of key for the window
  * @param <WV>  the type of aggregated value in the window output
  */
 @InterfaceStability.Unstable
-public final class WindowInternal<M, K, WV> implements Window<M, K, WV> {
+public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
 
-  private final Trigger defaultTrigger;
+  private final Trigger<M> defaultTrigger;
+
+  /**
+   * The supplier of initial value to be used for windowed aggregations
+   */
+  private final Supplier<WV> initializer;
 
   /*
    * The function that is applied each time a {@link MessageEnvelope} is added to this window.
    */
-  private final BiFunction<M, WV, WV> foldFunction;
+  private final FoldLeftFunction<M, WV> foldLeftFunction;
 
   /*
    * The function that extracts the key from a {@link MessageEnvelope}
    */
-  private final Function<M, K> keyExtractor;
+  private final Function<M, WK> keyExtractor;
 
   /*
    * The function that extracts the event time from a {@link MessageEnvelope}
    */
   private final Function<M, Long> eventTimeExtractor;
 
-  private Trigger earlyTrigger;
+  /**
+   * The type of this window. Tumbling and Session windows are supported for now.
+   */
+  private final WindowType windowType;
+
+  private Trigger<M> earlyTrigger;
 
-  private Trigger lateTrigger;
+  private Trigger<M> lateTrigger;
 
   private AccumulationMode mode;
 
-  public WindowInternal(Trigger defaultTrigger, BiFunction<M, WV, WV> foldFunction, Function<M, K> keyExtractor, Function<M, Long> eventTimeExtractor) {
-    this.foldFunction = foldFunction;
+  public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldLeftFunction, Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType) {
+    this.defaultTrigger = defaultTrigger;
+    this.initializer = initialValue;
+    this.foldLeftFunction = foldLeftFunction;
     this.eventTimeExtractor = eventTimeExtractor;
     this.keyExtractor = keyExtractor;
-    this.defaultTrigger = defaultTrigger;
+    this.windowType = windowType;
   }
 
   @Override
-  public Window<M, K, WV> setEarlyTrigger(Trigger trigger) {
+  public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) {
     this.earlyTrigger = trigger;
     return this;
   }
 
   @Override
-  public Window<M, K, WV> setLateTrigger(Trigger trigger) {
+  public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) {
     this.lateTrigger = trigger;
     return this;
   }
 
   @Override
-  public Window<M, K, WV> setAccumulationMode(AccumulationMode mode) {
+  public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) {
     this.mode = mode;
     return this;
   }
 
-  public Trigger getDefaultTrigger() {
+  public Trigger<M> getDefaultTrigger() {
     return defaultTrigger;
   }
 
-  public Trigger getEarlyTrigger() {
+  public Trigger<M> getEarlyTrigger() {
     return earlyTrigger;
   }
 
-  public Trigger getLateTrigger() {
+  public Trigger<M> getLateTrigger() {
     return lateTrigger;
   }
 
-  public BiFunction<M, WV, WV> getFoldFunction() {
-    return foldFunction;
+  public Supplier<WV> getInitializer() {
+    return initializer;
   }
 
-  public Function<M, K> getKeyExtractor() {
+  public FoldLeftFunction<M, WV> getFoldLeftFunction() {
+    return foldLeftFunction;
+  }
+
+  public Function<M, WK> getKeyExtractor() {
     return keyExtractor;
   }
 
   public Function<M, Long> getEventTimeExtractor() {
     return eventTimeExtractor;
   }
+
+  public WindowType getWindowType() {
+    return windowType;
+  }
+
+  public AccumulationMode getAccumulationMode() {
+    return mode;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java
new file mode 100644
index 0000000..409d56a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows.internal;
+
+public enum WindowType {
+  TUMBLING, SESSION
+  //,SLIDING
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
index 54d0b2f..4184c9d 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.windows;
 
+import org.apache.samza.operators.triggers.FiringType;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -26,7 +27,7 @@ import static org.junit.Assert.assertEquals;
 public class TestWindowPane {
   @Test
   public void testConstructor() {
-    WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10);
+    WindowPane<String, Integer> wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
     assertEquals(wndOutput.getKey().getKey(), "testMsg");
     assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index f801097..1b36f76 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.data.MessageEnvelope;

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
new file mode 100644
index 0000000..4e80862
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+/**
+ * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata.
+ */
+public class WindowState<WV> {
+
+  final WV wv;
+  /**
+   * Time of the first message in the window
+   */
+  final long earliestRecvTime;
+
+  public WindowState(WV wv, long earliestRecvTime) {
+    this.wv = wv;
+    this.earliestRecvTime = earliestRecvTime;
+  }
+
+  public WV getWindowValue() {
+    return wv;
+  }
+
+  public long getEarliestTimestamp() {
+    return earliestRecvTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
index 3efd5f5..ca8e34b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
@@ -27,6 +27,8 @@ import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -52,6 +54,16 @@ public class OperatorGraph {
    */
   private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
 
+  private final Clock clock;
+
+  public OperatorGraph(Clock clock) {
+    this.clock = clock;
+  }
+
+  public OperatorGraph() {
+    this(SystemClock.instance());
+  }
+
   /**
    * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
    * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
@@ -156,14 +168,14 @@ public class OperatorGraph {
    * @param context  the {@link TaskContext} required to instantiate operators
    * @return  the {@link OperatorImpl} implementation instance
    */
-  private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
+  private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
     if (operatorSpec instanceof StreamOperatorSpec) {
       StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
       return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
     } else if (operatorSpec instanceof SinkOperatorSpec) {
       return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
-      return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context);
+      return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
     } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
       return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 9983307..b9a606b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -52,41 +52,37 @@ public abstract class OperatorImpl<M, RM> {
   public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
 
   /**
-   * Perform the actions required on a timer tick and call the downstream operators.
-   *
-   * Overriding implementations must call {@link #propagateTimer} to propagate the timer tick to registered
-   * downstream operators correctly.
+   * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)}
    *
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    propagateTimer(collector, coordinator);
+  public final void onTick(MessageCollector collector, TaskCoordinator coordinator) {
+    onTimer(collector, coordinator);
+    nextOperators.forEach(sub -> sub.onTick(collector, coordinator));
   }
 
   /**
-   * Helper method to propagate the output of this operator to all registered downstream operators.
-   *
-   * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly.
+   * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output
+   * to registered downstream operators.
    *
-   * @param outputMessage  output message
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
-  void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
-    nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
   }
 
   /**
-   * Helper method to propagate the timer tick to all registered downstream operators.
+   * Helper method to propagate the output of this operator to all registered downstream operators.
    *
-   * This method <b>must</b> be called from {@link #onTimer} to propagate the timer tick correctly.
+   * This method <b>must</b> be called from {@link #onNext} and {@link #onTimer}
+   * to propagate the operator output correctly.
    *
+   * @param outputMessage  output message
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
-  void propagateTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    nextOperators.forEach(sub -> sub.onTimer(collector, coordinator));
+  void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
+    nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index f704f3f..b2948a3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -93,7 +93,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
     thisState.deleteAll(keysToRemove);
 
     LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, System.currentTimeMillis() - now);
-    this.propagateTimer(collector, coordinator);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
new file mode 100644
index 0000000..49fefc0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.operators.windows.WindowKey;
+
+/**
+ * Uniquely identifies a trigger firing
+ */
+public class TriggerKey<WK> {
+  private final FiringType type;
+  private final WindowKey<WK> key;
+
+  public TriggerKey(FiringType type, WindowKey<WK> key) {
+    if (type == null) {
+      throw new IllegalArgumentException("Firing type cannot be null");
+    }
+
+    if (key == null) {
+      throw new IllegalArgumentException("WindowKey cannot be null");
+    }
+
+    this.type = type;
+    this.key = key;
+  }
+
+  /**
+   * Equality is determined by both the type, and the window key.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    TriggerKey<WK> that = (TriggerKey<WK>) o;
+    return type == that.type && key.equals(that.key);
+  }
+
+  /**
+   * Hashcode is computed by from the type, and the window key.
+   */
+  @Override
+  public int hashCode() {
+    int result = type.hashCode();
+    result = 31 * result + key.hashCode();
+    return result;
+  }
+
+  public WindowKey<WK> getKey() {
+    return key;
+  }
+
+  public FiringType getType() {
+    return type;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java
new file mode 100644
index 0000000..952d9f1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.triggers.Cancellable;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Allows to schedule and cancel callbacks for triggers.
+ */
+public class TriggerScheduler<WK> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TriggerScheduler.class);
+
+  private final PriorityQueue<TriggerCallbackState<WK>> pendingCallbacks;
+  private final Clock clock;
+
+  public TriggerScheduler(Clock clock) {
+    this.pendingCallbacks = new PriorityQueue<>();
+    this.clock = clock;
+  }
+
+  /**
+   * Schedule the provided runnable for execution at the specified duration.
+   * @param runnable the provided runnable to schedule.
+   * @param scheduledTimeMs time at which the runnable must be scheduled for execution
+   * @param triggerKey a key that uniquely identifies the corresponding trigger firing.
+   * @return a {@link Cancellable} that can be used to cancel the execution of this runnable.
+   */
+  public Cancellable scheduleCallback(Runnable runnable, long scheduledTimeMs, TriggerKey<WK> triggerKey) {
+    TriggerCallbackState<WK> timerState = new TriggerCallbackState(triggerKey, runnable, scheduledTimeMs);
+    pendingCallbacks.add(timerState);
+    LOG.trace("Scheduled a new callback: {} at {} for triggerKey {}", new Object[] {runnable, scheduledTimeMs, triggerKey});
+    return timerState;
+  }
+
+  /**
+   * Run all pending callbacks that are ready to be scheduled. A callback is defined as "ready" if it's scheduledTime
+   * is less than or equal to {@link Clock#currentTimeMillis()}
+   *
+   * @return the list of {@link TriggerKey}s corresponding to the callbacks that were run.
+   */
+  public List<TriggerKey<WK>> runPendingCallbacks() {
+    TriggerCallbackState<WK> state;
+    List<TriggerKey<WK>> keys = new ArrayList<>();
+    long now = clock.currentTimeMillis();
+
+    while ((state = pendingCallbacks.peek()) != null && state.getScheduledTimeMs() <= now) {
+      pendingCallbacks.remove();
+      state.getCallback().run();
+      TriggerKey<WK> key = state.getTriggerKey();
+      keys.add(key);
+    }
+    return keys;
+  }
+
+  /**
+   * State corresponding to pending timer callbacks scheduled by various triggers.
+   */
+  private class TriggerCallbackState<WK> implements Comparable<TriggerCallbackState<WK>>, Cancellable {
+
+    private final TriggerKey<WK> triggerKey;
+    private final Runnable callback;
+
+    // the time at which the callback should trigger
+    private final long scheduledTimeMs;
+
+    private TriggerCallbackState(TriggerKey<WK> triggerKey, Runnable callback, long scheduledTimeMs) {
+      this.triggerKey = triggerKey;
+      this.callback = callback;
+      this.scheduledTimeMs = scheduledTimeMs;
+    }
+
+    private Runnable getCallback() {
+      return callback;
+    }
+
+    private long getScheduledTimeMs() {
+      return scheduledTimeMs;
+    }
+
+    private TriggerKey<WK> getTriggerKey() {
+      return triggerKey;
+    }
+
+    @Override
+    public int compareTo(TriggerCallbackState<WK> other) {
+      return Long.compare(this.scheduledTimeMs, other.scheduledTimeMs);
+    }
+
+    @Override
+    public boolean cancel() {
+      LOG.trace("Cancelled a callback: {} at {} for triggerKey {}", new Object[] {callback, scheduledTimeMs, triggerKey});
+      return pendingCallbacks.remove(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index af00553..cd3b1bc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -1,3 +1,5 @@
+// CHECKSTYLE:OFF
+// Turn off checkstyle for this class because of a checkstyle bug in handling nested typed collections
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,26 +20,300 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.triggers.RepeatingTriggerImpl;
+import org.apache.samza.operators.triggers.TimeTrigger;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.TriggerImpl;
+import org.apache.samza.operators.triggers.TriggerImpls;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowKey;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Implementation of a window operator that groups messages into finite windows for processing.
+ *
+ * This class implements the processing logic for various types of windows and triggers. It tracks and manages state for
+ * all open windows, the active triggers that correspond to each of the windows and the pending callbacks. It provides
+ * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can use to schedule and cancel callbacks. It
+ * also orchestrates the flow of messages through the various {@link TriggerImpl}s.
+ *
+ * <p> An instance of a {@link TriggerImplHandler} is created corresponding to each {@link Trigger} configured for a
+ * particular window. For every message added to the window, this class looks up the corresponding {@link TriggerImplHandler}
+ * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}.
+ * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet
+ * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A
+ * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The
+ * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream
+ * operators.
+ *
+ * @param <M> the type of the incoming message
+ * @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream}
+ * @param <WV> the type of the value in the emitted window pane
+ *
+ */
 public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class);
+
   private final WindowInternal<M, WK, WV> window;
+  private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>();
+  TriggerScheduler<WK> triggerScheduler ;
+
+  // The trigger state corresponding to each {@link TriggerKey}.
+  private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>();
+  private final Clock clock;
 
-  public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) {
-    // source, config, and context are used to initialize the window kv-store
-    window = spec.getWindow();
+  public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> spec, Clock clock) {
+    this.clock = clock;
+    this.window = spec.getWindow();
+    this.triggerScheduler= new TriggerScheduler(clock);
   }
 
   @Override
   public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    LOG.trace("Processing message envelope: {}", message);
 
+    WindowKey<WK> storeKey =  getStoreKey(message);
+    WindowState<WV> existingState = store.get(storeKey);
+    WindowState<WV> newState = applyFoldFunction(existingState, message);
+
+    LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp());
+    store.put(storeKey, newState);
+
+    if (window.getEarlyTrigger() != null) {
+      TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey);
+
+      getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger())
+          .onMessage(triggerKey, message, collector, coordinator);
+    }
+
+    if (window.getDefaultTrigger() != null) {
+      TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey);
+      getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger())
+          .onMessage(triggerKey, message, collector, coordinator);
+    }
   }
-}
+
+  @Override
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
+
+    for (TriggerKey<WK> key : keys) {
+      TriggerImplHandler triggerImplHandler = triggers.get(key);
+      if (triggerImplHandler != null) {
+        triggerImplHandler.onTimer(key, collector, coordinator);
+      }
+    }
+
+  }
+
+  /**
+   * Get the key to be used for lookups in the store for this message.
+   */
+  private WindowKey<WK> getStoreKey(M message) {
+    Function<M, WK> keyExtractor = window.getKeyExtractor();
+    WK key = null;
+
+    if (keyExtractor != null) {
+      key = keyExtractor.apply(message);
+    }
+
+    String paneId = null;
+
+    if (window.getWindowType() == WindowType.TUMBLING) {
+      long triggerDurationMs = ((TimeTrigger<M>) window.getDefaultTrigger()).getDuration().toMillis();
+      final long now = clock.currentTimeMillis();
+      Long windowBoundary = now - now % triggerDurationMs;
+      paneId = windowBoundary.toString();
+    }
+
+    return new WindowKey<>(key, paneId);
+  }
+
+  private WindowState<WV> applyFoldFunction(WindowState<WV> existingState, M message) {
+    WV wv;
+    long earliestTimestamp;
+
+    if (existingState == null) {
+      LOG.trace("No existing state found for key");
+      wv = window.getInitializer().get();
+      earliestTimestamp = clock.currentTimeMillis();
+    } else {
+      wv = existingState.getWindowValue();
+      earliestTimestamp = existingState.getEarliestTimestamp();
+    }
+
+    WV newVal = window.getFoldLeftFunction().apply(message, wv);
+    WindowState<WV> newState = new WindowState(newVal, earliestTimestamp);
+
+    return newState;
+  }
+
+  private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
+    TriggerImplHandler wrapper = triggers.get(triggerKey);
+    if (wrapper != null) {
+      LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
+      return wrapper;
+    }
+
+    LOG.trace("Creating a new trigger wrapper for {}", triggerKey);
+
+    TriggerImpl<M, WK> triggerImpl = TriggerImpls.createTriggerImpl(trigger, clock, triggerKey);
+    wrapper = new TriggerImplHandler(triggerKey, triggerImpl);
+    triggers.put(triggerKey, wrapper);
+
+    return wrapper;
+  }
+
+  /**
+   * Handles trigger firings, and propagates results to downstream operators.
+   */
+  private void onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) {
+    LOG.trace("Trigger key {} fired." , triggerKey);
+
+    TriggerImplHandler wrapper = triggers.get(triggerKey);
+    WindowKey<WK> windowKey = triggerKey.getKey();
+    WindowState<WV> state = store.get(windowKey);
+
+    if (state == null) {
+      LOG.trace("No state found for triggerKey: {}", triggerKey);
+      return;
+    }
+
+    WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state);
+    super.propagateResult(paneOutput, collector, coordinator);
+
+    // Handle accumulation modes.
+    if (window.getAccumulationMode() == AccumulationMode.DISCARDING) {
+      LOG.trace("Clearing state for trigger key: {}", triggerKey);
+      store.put(windowKey, null);
+    }
+
+    // Cancel all early triggers too when the default trigger fires. Also, clean all state for the key.
+    // note: We don't handle late arrivals yet, So, all arrivals are either early or on-time.
+    if (triggerKey.getType() == FiringType.DEFAULT) {
+
+      LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey);
+
+      cancelTrigger(triggerKey, true);
+      cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey()), true);
+      WindowKey<WK> key = triggerKey.getKey();
+      store.delete(key);
+    }
+
+    // Cancel non-repeating early triggers. All early triggers should be removed from the "triggers" map only after the
+    // firing of their corresponding default trigger. Removing them pre-maturely (immediately after cancellation) will
+    // will create a new {@link TriggerImplWrapper} instance at a future invocation of getOrCreateTriggerWrapper().
+    // This would cause an already canceled trigger to fire again for the window.
+
+    if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) {
+      cancelTrigger(triggerKey, false);
+    }
+  }
+
+  /**
+   * Computes the pane output corresponding to a {@link TriggerKey} that fired.
+   */
+  private WindowPane<WK, WV> computePaneOutput(TriggerKey<WK> triggerKey, WindowState<WV> state) {
+    WindowKey<WK> windowKey = triggerKey.getKey();
+    WV windowVal = state.getWindowValue();
+
+    // For session windows, we will create a new window key by using the time of the first message in the window as
+    //the paneId.
+    if (window.getWindowType() == WindowType.SESSION) {
+      windowKey = new WindowKey<>(windowKey.getKey(), Long.toString(state.getEarliestTimestamp()));
+    }
+
+    // Make a defensive copy so that we are immune to further mutations on the collection
+    if (windowVal instanceof Collection) {
+      windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal);
+    }
+
+    WindowPane<WK, WV> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
+    LOG.trace("Emitting pane output for trigger key {}", triggerKey);
+    return paneOutput;
+  }
+
+  /**
+   * Cancels the firing of the {@link TriggerImpl} identified by this {@link TriggerKey} and optionally removes it.
+   */
+  private void cancelTrigger(TriggerKey<WK> triggerKey, boolean shouldRemove) {
+    TriggerImplHandler triggerImplHandler = triggers.get(triggerKey);
+    if (triggerImplHandler != null) {
+      triggerImplHandler.cancel();
+    }
+    if (shouldRemove && triggerKey != null) {
+      triggers.remove(triggerKey);
+    }
+  }
+
+  /**
+   * State corresponding to a created {@link TriggerImpl} instance.
+   */
+  private class TriggerImplHandler {
+    // The context, and the {@link TriggerImpl} instance corresponding to this triggerKey
+    private final TriggerImpl<M, WK> impl;
+    // Guard to ensure that we don't invoke onMessage or onTimer on already cancelled triggers
+    private boolean isCancelled = false;
+
+    public TriggerImplHandler(TriggerKey<WK> key, TriggerImpl<M, WK> impl) {
+      this.impl = impl;
+    }
+
+    public void onMessage(TriggerKey<WK> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) {
+      if (!isCancelled) {
+        LOG.trace("Forwarding callbacks for {}", message);
+        impl.onMessage(message, triggerScheduler);
+
+        if (impl.shouldFire()) {
+          // repeating trigger can trigger multiple times, So, clear the state to allow future triggerings.
+          if (impl instanceof RepeatingTriggerImpl) {
+            ((RepeatingTriggerImpl<M, WK>) impl).clear();
+          }
+          onTriggerFired(triggerKey, collector, coordinator);
+        }
+      }
+    }
+
+    public void onTimer(TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) {
+      if (impl.shouldFire() && !isCancelled) {
+        LOG.trace("Triggering timer triggers");
+
+        // repeating trigger can trigger multiple times, So, clear the trigger to allow future triggerings.
+        if (impl instanceof RepeatingTriggerImpl) {
+          ((RepeatingTriggerImpl<M, WK>) impl).clear();
+        }
+        onTriggerFired(key, collector, coordinator);
+      }
+    }
+
+    public void cancel() {
+      impl.cancel();
+      isCancelled = true;
+    }
+
+    public boolean isRepeating() {
+      return this.impl instanceof RepeatingTriggerImpl;
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 46417ed..6d948d7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,9 +19,11 @@
 
 package org.apache.samza.operators.spec;
 
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -54,11 +56,18 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
   }
 
   @Override
+  public void init(Config config, TaskContext context) {
+    if (window.getFoldLeftFunction() != null) {
+      window.getFoldLeftFunction().init(config, context);
+    }
+  }
+
+  @Override
   public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
     return this.outputStream;
   }
 
-  public WindowInternal getWindow() {
+  public WindowInternal<M, WK, WV> getWindow() {
     return window;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java
new file mode 100644
index 0000000..a0aa384
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of an {@link AnyTrigger}
+ */
+public class AnyTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+  private final List<Trigger<M>> triggers;
+
+  private final List<TriggerImpl<M, WK>> triggerImpls = new ArrayList<>();
+  private final Clock clock;
+  private boolean shouldFire = false;
+
+  public AnyTriggerImpl(AnyTrigger<M> anyTrigger, Clock clock, TriggerKey<WK> triggerKey) {
+    this.triggers = anyTrigger.getTriggers();
+    this.clock = clock;
+    for (Trigger<M> trigger : triggers) {
+      triggerImpls.add(TriggerImpls.createTriggerImpl(trigger, clock, triggerKey));
+    }
+  }
+
+  @Override
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    for (TriggerImpl<M, WK> impl : triggerImpls) {
+      impl.onMessage(message, context);
+      if (impl.shouldFire()) {
+        shouldFire = true;
+        break;
+      }
+    }
+    if (shouldFire) {
+      cancel();
+    }
+  }
+
+  public void cancel() {
+    for (Iterator<TriggerImpl<M, WK>> it = triggerImpls.iterator(); it.hasNext(); ) {
+      TriggerImpl<M, WK> impl = it.next();
+      impl.cancel();
+      it.remove();
+    }
+  }
+
+  @Override
+  public boolean shouldFire() {
+    for (TriggerImpl<M, WK> impl : triggerImpls) {
+      if (impl.shouldFire()) {
+        shouldFire = true;
+        break;
+      }
+    }
+    return shouldFire;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
new file mode 100644
index 0000000..ca0ba67
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+/**
+ * Represents a task or an operation whose execution can be cancelled.
+ */
+public interface Cancellable {
+
+  /**
+   * Cancel the execution of this operation (if it is not scheduled for execution yet). If the operation is in progress,
+   * it is not interrupted / cancelled.
+   *
+   * @return the result of the cancelation
+   */
+  public boolean cancel();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java
new file mode 100644
index 0000000..da1efda
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link CountTrigger}
+ */
+public class CountTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+  private static final Logger LOG = LoggerFactory.getLogger(CountTriggerImpl.class);
+
+  private final long triggerCount;
+  private final TriggerKey<WK> triggerKey;
+  private long currentCount;
+  private boolean shouldFire = false;
+
+  public CountTriggerImpl(CountTrigger<M> triggerCount, TriggerKey<WK> triggerKey) {
+    this.triggerCount = triggerCount.getCount();
+    this.currentCount = 0;
+    this.triggerKey = triggerKey;
+  }
+
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    currentCount++;
+    if (currentCount == triggerCount) {
+      LOG.trace("count trigger fired for {}", message);
+      shouldFire = true;
+    }
+  }
+
+  @Override
+  public void cancel() {
+    //no-op
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return shouldFire;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java
new file mode 100644
index 0000000..39b9b40
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link RepeatingTrigger}
+ */
+public class RepeatingTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+  private static final Logger LOG = LoggerFactory.getLogger(RepeatingTriggerImpl.class);
+
+  private final Trigger<M> repeatingTrigger;
+  private final Clock clock;
+  private final TriggerKey<WK> triggerKey;
+
+  private TriggerImpl<M, WK> currentTriggerImpl;
+
+  public RepeatingTriggerImpl(RepeatingTrigger<M> repeatingTrigger, Clock clock, TriggerKey<WK> key) {
+    this.repeatingTrigger = repeatingTrigger.getTrigger();
+    this.clock = clock;
+    this.triggerKey = key;
+    this.currentTriggerImpl = TriggerImpls.createTriggerImpl(this.repeatingTrigger, clock, triggerKey);
+  }
+
+  @Override
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    currentTriggerImpl.onMessage(message, context);
+  }
+
+  @Override
+  public void cancel() {
+    currentTriggerImpl.cancel();
+  }
+
+  public void clear() {
+    LOG.trace("Clearing state for repeating trigger");
+    currentTriggerImpl.cancel();
+    currentTriggerImpl = TriggerImpls.createTriggerImpl(repeatingTrigger, clock, triggerKey);
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return currentTriggerImpl.shouldFire();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java
new file mode 100644
index 0000000..32bf988
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeSinceFirstMessageTrigger}
+ * @param <M> the type of the incoming message
+ */
+public class TimeSinceFirstMessageTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TimeSinceFirstMessageTriggerImpl.class);
+
+  private final TimeSinceFirstMessageTrigger<M> trigger;
+  private final Clock clock;
+  private final TriggerKey<WK> triggerKey;
+  private Cancellable cancellable;
+  private boolean shouldFire = false;
+
+  public TimeSinceFirstMessageTriggerImpl(TimeSinceFirstMessageTrigger<M> trigger, Clock clock, TriggerKey<WK> key) {
+    this.trigger = trigger;
+    this.clock = clock;
+    this.triggerKey = key;
+  }
+
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    if (cancellable == null && !shouldFire) {
+      final long now = clock.currentTimeMillis();
+      long triggerDurationMs = trigger.getDuration().toMillis();
+      Long callbackTime = now + triggerDurationMs;
+      cancellable =  context.scheduleCallback(() -> {
+          LOG.trace("Time since first message trigger fired");
+          shouldFire = true;
+        }, callbackTime, triggerKey);
+    }
+  }
+
+  @Override
+  public void cancel() {
+    if (cancellable != null) {
+      cancellable.cancel();
+    }
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return shouldFire;
+  }
+}