You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/07/12 22:52:01 UTC

[3/4] apex-malhar git commit: APEXMALHAR-2085 windowed operator interfaces and implementation

APEXMALHAR-2085 windowed operator interfaces and implementation

addressing PR comments

Added trigger unit tests

Fixed sliding window bug, added unit tests

Fixed session window bug; added more unit tests

Gives window a chance to trigger before purging because of lateness

added more unit tests

Process watermark only at end window

Renamed WatermarkOpt to Type, and fixed bug when window in retraction storage is not dropped when it's too late

added copyright

rat check

changed WindowOption from an abstract class to an interface

Use mutable objects for accumulated types

Retraction storage needs to be based on the output, not accumulated type

Added more unit tests

added support of fixed lateness in case watermark is not available from upstream

changed name from fixed lateness to fixed watermark

support inheriting windowed tuple from upstream when window option is not given


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7a77274a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7a77274a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7a77274a

Branch: refs/heads/master
Commit: 7a77274a2bc3147d9a72886649bfe904c77a903f
Parents: 719cf95
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jun 23 14:37:29 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Jul 11 15:27:09 2016 -0700

----------------------------------------------------------------------
 .../lib/testbench/CollectorTestSink.java        |    10 +-
 .../apex/malhar/lib/window/Accumulation.java    |    76 +
 .../apex/malhar/lib/window/ControlTuple.java    |    42 +
 .../malhar/lib/window/JoinAccumulation.java     |    65 +
 .../lib/window/SessionWindowedStorage.java      |    47 +
 .../apex/malhar/lib/window/TriggerOption.java   |   278 +
 .../apache/apex/malhar/lib/window/Tuple.java    |   136 +
 .../apache/apex/malhar/lib/window/Window.java   |   194 +
 .../apex/malhar/lib/window/WindowOption.java    |   137 +
 .../apex/malhar/lib/window/WindowState.java     |    45 +
 .../malhar/lib/window/WindowedKeyedStorage.java |    79 +
 .../malhar/lib/window/WindowedOperator.java     |   133 +
 .../apex/malhar/lib/window/WindowedStorage.java |    91 +
 .../window/impl/AbstractWindowedOperator.java   |   512 +
 .../impl/InMemoryWindowedKeyedStorage.java      |   101 +
 .../window/impl/InMemoryWindowedStorage.java    |    89 +
 .../window/impl/KeyedWindowedOperatorImpl.java  |   174 +
 .../malhar/lib/window/impl/WatermarkImpl.java   |    53 +
 .../lib/window/impl/WindowedOperatorImpl.java   |    80 +
 .../apex/malhar/lib/window/SumAccumulation.java |    59 +
 .../malhar/lib/window/WindowedOperatorTest.java |   550 +
 .../lib/window/sample/pi/Application.java       |   127 +
 .../window/sample/wordcount/Application.java    |   157 +
 library/src/test/resources/wordcount.txt        | 16271 +++++++++++++++++
 24 files changed, 19505 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
index 2fd776e..b9562da 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
@@ -67,6 +67,14 @@ public class CollectorTestSink<T> implements Sink<T>
   @Override
   public int getCount(boolean reset)
   {
-    throw new UnsupportedOperationException("Not supported yet.");
+    synchronized (collectedTuples) {
+      try {
+        return collectedTuples.size();
+      } finally {
+        if (reset) {
+          collectedTuples.clear();
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
new file mode 100644
index 0000000..17971bb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface is for the processing part of the WindowedOperator.
+ * We can assume that all stateful processing of the WindowedOperator is a form of accumulation.
+ *
+ * In most cases, AccumT is the same as OutputT. But in some cases, the accumulated type and the output type may be
+ * different. For example, if we are doing the AVERAGE of doubles, InputT will be double, and we need the SUM and the
+ * COUNT stored as type AccumT, and AccumT will be a pair of double and long, in which double is the sum of the inputs,
+ * and long is the number of inputs. OutputT will be double, because it represents the average of the inputs.
+ */
+@InterfaceStability.Evolving
+public interface Accumulation<InputT, AccumT, OutputT>
+{
+  /**
+   * Returns the default accumulated value when nothing has been accumulated
+   *
+   * @return
+   */
+  AccumT defaultAccumulatedValue();
+
+  /**
+   * Accumulates the input to the accumulated value
+   *
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  AccumT accumulate(AccumT accumulatedValue, InputT input);
+
+  /**
+   * Merges two accumulated value into one
+   *
+   * @param accumulatedValue1
+   * @param accumulatedValue2
+   * @return
+   */
+  AccumT merge(AccumT accumulatedValue1, AccumT accumulatedValue2);
+
+  /**
+   * Gets the output of the accumulated value. This is used for generating the data for triggers
+   *
+   * @param accumulatedValue
+   * @return
+   */
+  OutputT getOutput(AccumT accumulatedValue);
+
+  /**
+   * Gets the retraction of the value. This is used for retracting previous panes in
+   * ACCUMULATING_AND_RETRACTING accumulation mode
+   *
+   * @param value
+   * @return
+   */
+  OutputT getRetraction(OutputT value);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
new file mode 100644
index 0000000..d4fca11
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Control tuple interface.
+ * TODO: This should be removed or moved to Apex Core when Apex Core has native support for custom control tuples.
+ */
+@InterfaceStability.Evolving
+public interface ControlTuple
+{
+  /**
+   * Watermark control tuple
+   */
+  interface Watermark extends ControlTuple
+  {
+    /**
+     * Gets the timestamp associated with this watermark
+     *
+     * @return
+     */
+    long getTimestamp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
new file mode 100644
index 0000000..b485dd2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the interface for accumulation when joining multiple streams.
+ */
+@InterfaceStability.Evolving
+public interface JoinAccumulation<InputT1, InputT2, InputT3, InputT4, InputT5, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT>
+{
+  /**
+   * Accumulate the second input type to the accumulated value
+   *
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  AccumT accumulate2(AccumT accumulatedValue, InputT2 input);
+
+  /**
+   * Accumulate the third input type to the accumulated value
+   *
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  AccumT accumulate3(AccumT accumulatedValue, InputT3 input);
+
+  /**
+   * Accumulate the fourth input type to the accumulated value
+   *
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  AccumT accumulate4(AccumT accumulatedValue, InputT4 input);
+
+  /**
+   * Accumulate the fifth input type to the accumulated value
+   *
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  AccumT accumulate5(AccumT accumulatedValue, InputT5 input);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
new file mode 100644
index 0000000..c31885a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface is for storing data for session windowed streams.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+@InterfaceStability.Evolving
+public interface SessionWindowedStorage<K, V> extends WindowedKeyedStorage<K, V>
+{
+  /**
+   * Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap.
+   * This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge
+   * session windows.
+   * This should only return at most two entries if sessions have been merged appropriately.
+   *
+   * @param key the key
+   * @param timestamp the timestamp
+   * @param gap
+   * @return
+   */
+  Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
new file mode 100644
index 0000000..6727f7b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * This class describes how triggers should be fired for each window.
+ * For each window, a trigger can be fired before the watermark (EARLY), at the watermark (ON_TIME), or after the watermark (LATE).
+ * If a LATE trigger is specified and the accumulation mode is ACCUMULATING, it is important for the WindowOption to
+ * specify the allowed lateness because otherwise, all states must be kept in storage.
+ *
+ */
+@InterfaceStability.Evolving
+public class TriggerOption
+{
+
+  public enum AccumulationMode
+  {
+    DISCARDING,
+    ACCUMULATING,
+    ACCUMULATING_AND_RETRACTING
+  }
+
+  private AccumulationMode accumulationMode = AccumulationMode.DISCARDING;
+  private boolean firingOnlyUpdatedPanes = false;
+
+  /**
+   * Whether the trigger should be fired before the watermark, at the watermark, or after the watermark
+   */
+  public enum Type
+  {
+    EARLY,
+    ON_TIME,
+    LATE
+  }
+
+  /**
+   * This class represents the individual trigger spec.
+   */
+  public static class Trigger
+  {
+    protected Type type;
+
+    private Trigger()
+    {
+      // for kryo
+    }
+
+    Trigger(Type type)
+    {
+      this.type = type;
+    }
+
+    public Type getType()
+    {
+      return type;
+    }
+  }
+
+  /**
+   * This class represents a trigger spec in which triggers are fired at regular time intervals.
+   */
+  public static class TimeTrigger extends Trigger
+  {
+    @FieldSerializer.Bind(JavaSerializer.class)
+    Duration duration;
+
+    private TimeTrigger()
+    {
+      // for kryo
+    }
+
+    public TimeTrigger(Type type, Duration duration)
+    {
+      super(type);
+      this.duration = duration;
+    }
+
+    public Duration getDuration()
+    {
+      return duration;
+    }
+  }
+
+  /**
+   * This class represents a trigger spec in which triggers are fired every n tuples
+   */
+  public static class CountTrigger extends Trigger
+  {
+    private long count;
+    private CountTrigger()
+    {
+      //for kryo
+    }
+
+    public CountTrigger(Type type, long count)
+    {
+      super(type);
+      this.count = count;
+    }
+
+    public long getCount()
+    {
+      return count;
+    }
+  }
+
+  List<Trigger> triggerList = new ArrayList<>();
+
+  /**
+   * Creates a TriggerOption with an initial trigger that should be fired at the watermark
+   *
+   * @return
+   */
+  public static TriggerOption AtWatermark()
+  {
+    TriggerOption triggerOption = new TriggerOption();
+    Trigger trigger = new Trigger(Type.ON_TIME);
+    triggerOption.triggerList.add(trigger);
+    return triggerOption;
+  }
+
+  /**
+   * A trigger should be fired before the watermark once for every specified duration
+   *
+   * @param duration
+   * @return
+   */
+  public TriggerOption withEarlyFiringsAtEvery(Duration duration)
+  {
+    TimeTrigger trigger = new TimeTrigger(Type.EARLY, duration);
+    triggerList.add(trigger);
+    return this;
+  }
+
+  /**
+   * A trigger should be fired before the watermark once for every n tuple(s)
+   *
+   * @param count
+   * @return
+   */
+  public TriggerOption withEarlyFiringsAtEvery(long count)
+  {
+    CountTrigger trigger = new CountTrigger(Type.EARLY, count);
+    triggerList.add(trigger);
+    return this;
+  }
+
+  /**
+   * A trigger should be fired after the watermark once for every specified duration
+   *
+   * @param duration
+   * @return
+   */
+  public TriggerOption withLateFiringsAtEvery(Duration duration)
+  {
+    TimeTrigger trigger = new TimeTrigger(Type.LATE, duration);
+    triggerList.add(trigger);
+    return this;
+  }
+
+  /**
+   * A trigger should be fired after the watermark once for every n late tuple(s)
+   *
+   * @param count
+   * @return
+   */
+  public TriggerOption withLateFiringsAtEvery(long count)
+  {
+    CountTrigger trigger = new CountTrigger(Type.LATE, count);
+    triggerList.add(trigger);
+    return this;
+  }
+
+  /**
+   * With discarding mode, the state is discarded after each trigger
+   *
+   * @return
+   */
+  public TriggerOption discardingFiredPanes()
+  {
+    this.accumulationMode = AccumulationMode.DISCARDING;
+    return this;
+  }
+
+  /**
+   * With accumulating mode, the state is kept
+   *
+   * @return
+   */
+  public TriggerOption accumulatingFiredPanes()
+  {
+    this.accumulationMode = AccumulationMode.ACCUMULATING;
+    return this;
+  }
+
+  /**
+   * With accumulating and retracting mode, the state is kept, and the snapshot of the state is saved after each trigger
+   * so when new values come in that change the state, a retraction trigger can be fired with the snapshot of the state
+   * when the last trigger was fired
+   *
+   * @return
+   */
+  public TriggerOption accumulatingAndRetractingFiredPanes()
+  {
+    this.accumulationMode = AccumulationMode.ACCUMULATING_AND_RETRACTING;
+    return this;
+  }
+
+  /**
+   * Only fire triggers for data that has changed from the last trigger. This only applies to ACCUMULATING and
+   * ACCUMULATING_AND_RETRACTING accumulation modes.
+   *
+   * @return
+   */
+  public TriggerOption firingOnlyUpdatedPanes()
+  {
+    this.firingOnlyUpdatedPanes = true;
+    return this;
+  }
+
+  /**
+   * Gets the accumulation mode
+   *
+   * @return
+   */
+  public AccumulationMode getAccumulationMode()
+  {
+    return accumulationMode;
+  }
+
+  /**
+   * Gets the trigger list
+   *
+   * @return
+   */
+  public List<Trigger> getTriggerList()
+  {
+    return Collections.unmodifiableList(triggerList);
+  }
+
+  /**
+   * Returns whether we should only fire panes that have been updated since the last trigger.
+   * When this option is set, DISCARDING accumulation mode must not be used.
+   *
+   * @return
+   */
+  public boolean isFiringOnlyUpdatedPanes()
+  {
+    return this.firingOnlyUpdatedPanes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
new file mode 100644
index 0000000..af0b1ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * All tuples that use the WindowedOperator must be an implementation of this interface
+ */
+@InterfaceStability.Evolving
+public interface Tuple<T>
+{
+  /**
+   * Gets the value of the tuple
+   *
+   * @return
+   */
+  T getValue();
+
+  /**
+   * Plain tuple class
+   *
+   * @param <T>
+   */
+  class PlainTuple<T> implements Tuple<T>
+  {
+    private T value;
+
+    private PlainTuple()
+    {
+      // for kryo
+    }
+
+    public PlainTuple(T value)
+    {
+      this.value = value;
+    }
+
+    public T getValue()
+    {
+      return value;
+    }
+
+    public void setValue(T value)
+    {
+      this.value = value;
+    }
+
+    @Override
+    public String toString()
+    {
+      return value.toString();
+    }
+  }
+
+  /**
+   * Tuple that is wrapped by a timestamp
+   *
+   * @param <T>
+   */
+  class TimestampedTuple<T> extends PlainTuple<T>
+  {
+    private long timestamp;
+
+    private TimestampedTuple()
+    {
+      // for kryo
+    }
+
+    public TimestampedTuple(long timestamp, T value)
+    {
+      super(value);
+      this.timestamp = timestamp;
+    }
+
+    public long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  }
+
+  /**
+   * Tuple that is wrapped by a timestamp and one or more windows
+   *
+   * @param <T>
+   */
+  class WindowedTuple<T> extends TimestampedTuple<T>
+  {
+    private List<Window> windows = new ArrayList<>();
+
+    public WindowedTuple()
+    {
+    }
+
+    public WindowedTuple(Window window, T value)
+    {
+      super(window.getBeginTimestamp(), value);
+      this.windows.add(window);
+    }
+
+    public List<Window> getWindows()
+    {
+      return windows;
+    }
+
+    public void addWindow(Window window)
+    {
+      this.windows.add(window);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
new file mode 100644
index 0000000..1ba9acd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface describes the individual window.
+ */
+@InterfaceStability.Evolving
+public interface Window
+{
+  long getBeginTimestamp();
+
+  long getDurationMillis();
+
+  /**
+   * Global window means there is only one window, or no window depending on how you look at it.
+   */
+  class GlobalWindow implements Window
+  {
+    private GlobalWindow()
+    {
+    }
+
+    @Override
+    public long getBeginTimestamp()
+    {
+      return 0;
+    }
+
+    public long getDurationMillis()
+    {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      return (other instanceof GlobalWindow);
+    }
+  }
+
+  class DefaultComparator implements Comparator<Window>
+  {
+    private DefaultComparator()
+    {
+    }
+
+    @Override
+    public int compare(Window o1, Window o2)
+    {
+      if (o1.getBeginTimestamp() < o2.getBeginTimestamp()) {
+        return -1;
+      } else if (o1.getBeginTimestamp() > o2.getBeginTimestamp()) {
+        return 1;
+      } else if (o1.getDurationMillis() < o2.getDurationMillis()) {
+        return -1;
+      } else if (o1.getDurationMillis() > o2.getDurationMillis()) {
+        return 1;
+      } else if (o1 instanceof SessionWindow && o2 instanceof SessionWindow) {
+        return Long.compare(((SessionWindow)o1).getKey().hashCode(), ((SessionWindow)o2).getKey().hashCode());
+      } else {
+        return 0;
+      }
+    }
+  }
+
+  /**
+   * The singleton global window
+   */
+  GlobalWindow GLOBAL_WINDOW = new GlobalWindow();
+
+  /**
+   * The singleton default comparator of windows
+   */
+  Comparator<Window> DEFAULT_COMPARATOR = new DefaultComparator();
+
+  /**
+   * TimeWindow is a window that represents a time slice
+   */
+  class TimeWindow implements Window
+  {
+    protected long beginTimestamp;
+    protected long durationMillis;
+
+    private TimeWindow()
+    {
+      // for kryo
+    }
+
+    public TimeWindow(long beginTimestamp, long durationMillis)
+    {
+      this.beginTimestamp = beginTimestamp;
+      this.durationMillis = durationMillis;
+    }
+
+    /**
+     * Gets the beginning timestamp of this window
+     *
+     * @return
+     */
+    @Override
+    public long getBeginTimestamp()
+    {
+      return beginTimestamp;
+    }
+
+    /**
+     * Gets the duration millis of this window
+     *
+     * @return
+     */
+    @Override
+    public long getDurationMillis()
+    {
+      return durationMillis;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      if (other instanceof TimeWindow) {
+        TimeWindow otherWindow = (TimeWindow)other;
+        return this.beginTimestamp == otherWindow.beginTimestamp && this.durationMillis == otherWindow.durationMillis;
+      } else {
+        return false;
+      }
+    }
+
+  }
+
+  /**
+   * SessionWindow is a window that represents a time slice for a key, with the time slice being variable length.
+   *
+   * @param <K>
+   */
+  class SessionWindow<K> extends TimeWindow
+  {
+    private K key;
+
+    private SessionWindow()
+    {
+      // for kryo
+    }
+
+    public SessionWindow(K key, long beginTimestamp, long duration)
+    {
+      super(beginTimestamp, duration);
+      this.key = key;
+    }
+
+    public K getKey()
+    {
+      return key;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      if (!super.equals(other)) {
+        return false;
+      }
+      if (other instanceof SessionWindow) {
+        SessionWindow<K> otherSessionWindow = (SessionWindow<K>)other;
+        if (key == null) {
+          return otherSessionWindow.key == null;
+        } else {
+          return key.equals(otherSessionWindow.key);
+        }
+      } else {
+        return false;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
new file mode 100644
index 0000000..525a0d6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.joda.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * This class describes how windowing is done
+ *
+ *  This is used by both the high level API and by the WindowedOperator
+ */
+@InterfaceStability.Evolving
+public interface WindowOption
+{
+  /**
+   * The windowing specification that says there is only one window for the entire time of the application
+   */
+  class GlobalWindow implements WindowOption
+  {
+  }
+
+  /**
+   * The windowing specification that divides the time into slices with the same width
+   */
+  class TimeWindows implements WindowOption
+  {
+    @FieldSerializer.Bind(JavaSerializer.class)
+    private Duration duration;
+
+    private TimeWindows()
+    {
+      // for kryo
+    }
+
+    public TimeWindows(Duration duration)
+    {
+      this.duration = duration;
+    }
+
+    /**
+     * Gets the duration of the time window
+     *
+     * @return
+     */
+    public Duration getDuration()
+    {
+      return duration;
+    }
+
+    /**
+     * The time window should be a sliding window with the given slide duration
+     *
+     * @param duration
+     * @return
+     */
+    public SlidingTimeWindows slideBy(Duration duration)
+    {
+      return new SlidingTimeWindows(this.duration, duration);
+    }
+  }
+
+  /**
+   * The window specification that represents sliding windows
+   *
+   */
+  class SlidingTimeWindows extends TimeWindows
+  {
+    @FieldSerializer.Bind(JavaSerializer.class)
+    private Duration slideByDuration;
+
+    private SlidingTimeWindows()
+    {
+      // for kryo
+    }
+
+    public SlidingTimeWindows(Duration size, Duration slideByDuration)
+    {
+      super(size);
+      if (size.getMillis() % slideByDuration.getMillis() != 0) {
+        throw new IllegalArgumentException("Window size must be divisible by the slide-by duration");
+      }
+      this.slideByDuration = slideByDuration;
+    }
+
+    public Duration getSlideByDuration()
+    {
+      return slideByDuration;
+    }
+  }
+
+  /**
+   * The window specification that represents session windows, with a minimum gap duration between two windows with the
+   * same key.
+   */
+  class SessionWindows implements WindowOption
+  {
+    @FieldSerializer.Bind(JavaSerializer.class)
+    private Duration minGap;
+
+    private SessionWindows()
+    {
+      // for kryo
+    }
+
+    public SessionWindows(Duration minGap)
+    {
+      this.minGap = minGap;
+    }
+
+    public Duration getMinGap()
+    {
+      return minGap;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowState.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowState.java
new file mode 100644
index 0000000..315ad8d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowState.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * The state that needs to be stored for each window. The state helps determine whether to throw away a window
+ * (with allowed lateness in WindowOption), and whether to fire a trigger (with TriggerOption)
+ */
+@InterfaceStability.Evolving
+public class WindowState
+{
+  /**
+   * The timestamp when the watermark arrives. If it has not arrived, -1.
+   */
+  public long watermarkArrivalTime = -1;
+
+  /**
+   * The timestamp when the last trigger was fired
+   */
+  public long lastTriggerFiredTime = -1;
+
+  /**
+   * The tuple count. Should be incremented for every tuple that belongs to the associated window
+   */
+  public long tupleCount = 0;
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
new file mode 100644
index 0000000..30ac3e7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface is for a key/value store for storing data for windowed streams.
+ * The key to this key/value store is a pair of (Window, K).
+ * Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures
+ * in the near future.
+ *
+ * Note that this interface expects that the implementation takes care of checkpoint recovery.
+ *
+ */
+@InterfaceStability.Unstable
+public interface WindowedKeyedStorage<K, V> extends WindowedStorage<Map<K, V>>
+{
+  /**
+   * Sets the data associated with the given window and the key
+   *
+   * @param window
+   * @param key
+   * @param value
+   */
+  void put(Window window, K key, V value);
+
+  /**
+   * Gets the key/value pairs associated with the given window
+   *
+   * @param window
+   * @return
+   */
+  Iterable<Map.Entry<K, V>> entrySet(Window window);
+
+  /**
+   * Gets the data associated with the given window and the key
+   *
+   * @param window
+   * @param key
+   * @return
+   */
+  V get(Window window, K key);
+
+  /**
+   * Removes all the data associated with the given window
+   *
+   * @param window
+   */
+  void remove(Window window);
+
+  /**
+   * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
+   * data to toWindow, and overwrite any existing data in toWindow
+   *
+   * @param fromWindow
+   * @param toWindow
+   */
+  void migrateWindow(Window fromWindow, Window toWindow);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
new file mode 100644
index 0000000..e9ee404
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.joda.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+/**
+ * This interface describes what needs to be implemented for the operator that supports the Apache Beam model of
+ * windowing and triggering
+ *
+ * TODO: We may not need this interface at all since there are no components that make use of these methods generically.
+ * TODO: We may wanna just use the abstract class {@link org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator}
+ *
+ * @param <InputT> The type of the input tuple
+ */
+@InterfaceStability.Evolving
+public interface WindowedOperator<InputT>
+{
+
+  /**
+   * Sets the WindowOption of this operator
+   *
+   * @param windowOption
+   */
+  void setWindowOption(WindowOption windowOption);
+
+  /**
+   * Sets the TriggerOption of this operator
+   *
+   * @param triggerOption
+   */
+  void setTriggerOption(TriggerOption triggerOption);
+
+  /**
+   * Sets the allowed lateness of this operator
+   *
+   * @param allowedLateness
+   */
+  void setAllowedLateness(Duration allowedLateness);
+
+  /**
+   * This methods sets the storage for the meta data for each window
+   *
+   * @param storageAgent
+   */
+  void setWindowStateStorage(WindowedStorage<WindowState> storageAgent);
+
+  /**
+   * This sets the function that extracts the timestamp from the input tuple
+   *
+   * @param timestampExtractor
+   */
+  void setTimestampExtractor(Function<InputT, Long> timestampExtractor);
+
+  /**
+   * Assign window(s) for this input tuple
+   *
+   * @param input
+   * @return
+   */
+  Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input);
+
+  /**
+   * This method returns whether the given timestamp is too late for processing.
+   * The implementation of this operator should look at the allowed lateness in the WindowOption.
+   * It should also call this function and if it returns true, it should drop the associated tuple.
+   *
+   * @param timestamp
+   * @return
+   */
+  boolean isTooLate(long timestamp);
+
+  /**
+   * This method is supposed to drop the tuple because it has passed the allowed lateness. But an implementation
+   * of this method has the chance to do something different (e.g. emit it to another port)
+   *
+   * @param input
+   */
+  void dropTuple(Tuple<InputT> input);
+
+  /**
+   * This method accumulates the incoming tuple (with the Accumulation interface)
+   *
+   * @param tuple
+   */
+  void accumulateTuple(Tuple.WindowedTuple<InputT> tuple);
+
+  /**
+   * This method should be called when the watermark for the given timestamp arrives
+   * The implementation should retrieve all valid windows in its state that lies completely before this watermark,
+   * and change the state of each of those windows. All tuples for those windows arriving after
+   * the watermark will be considered late.
+   *
+   * @param watermark
+   */
+  void processWatermark(ControlTuple.Watermark watermark);
+
+  /**
+   * This method fires the trigger for the given window, and possibly retraction trigger. The implementation should clear
+   * the window data in the storage if the accumulation mode is DISCARDING
+   *
+   * @param window
+   */
+  void fireTrigger(Window window, WindowState windowState);
+
+  /**
+   * This method clears the window data in the storage.
+   *
+   * @param window
+   */
+  void clearWindowData(Window window);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
new file mode 100644
index 0000000..8cd045c
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * WindowedStorage is a key-value store with the key being the window. The implementation of this interface should
+ * make sure checkpointing and recovery will be done correctly.
+ * Note that this interface may go away soon as there are plans to incorporate {@link Spillable} data structures in the
+ * near future.
+ *
+ * @param <T> The type of the data that is stored per window
+ *
+ * TODO: Look at the possibility of integrating spillable data structure: https://issues.apache.org/jira/browse/APEXMALHAR-2026
+ */
+@InterfaceStability.Unstable
+public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
+{
+  /**
+   * Returns true if the storage contains this window
+   *
+   * @param window
+   */
+  boolean containsWindow(Window window);
+
+  /**
+   * Returns the number of windows in the storage
+   *
+   * @return
+   */
+  long size();
+
+  /**
+   * Sets the data associated with the given window
+   *
+   * @param window
+   * @param value
+   */
+  void put(Window window, T value);
+
+  /**
+   * Gets the value associated with the given window
+   *
+   * @param window
+   * @return
+   */
+  T get(Window window);
+
+  /**
+   * Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state
+   *
+   * @param window
+   */
+  void remove(Window window);
+
+  /**
+   * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
+   * data to toWindow, and overwrite any existing data in toWindow
+   *
+   * @param fromWindow
+   * @param toWindow
+   */
+  void migrateWindow(Window fromWindow, Window toWindow);
+
+  /**
+   * Returns the iterable of the entries in the storage
+   *
+   * @return
+   */
+  Iterable<Map.Entry<Window, T>> entrySet();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
new file mode 100644
index 0000000..354738d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ * @param <InputT> The type of the input tuple
+ * @param <OutputT> The type of the output tuple
+ * @param <DataStorageT> The type of the data storage
+ * @param <RetractionStorageT> The type of the retraction storage
+ * @param <AccumulationT> The type of the accumulation
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation>
+    extends BaseOperator implements WindowedOperator<InputT>
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage<WindowState> windowStateMap;
+
+  private Function<InputT, Long> timestampExtractor;
+
+  private long currentWatermark = -1;
+  private long watermarkTimestamp = -1;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  private long fixedWatermarkMillis = -1;
+  protected DataStorageT dataStorage;
+  protected RetractionStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>()
+  {
+    @Override
+    public void process(Tuple<InputT> tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  // TODO: This port should be removed when Apex Core has native support for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ControlTuple> controlInput = new DefaultInputPort<ControlTuple>()
+  {
+    @Override
+    public void process(ControlTuple tuple)
+    {
+      if (tuple instanceof ControlTuple.Watermark) {
+        processWatermark((ControlTuple.Watermark)tuple);
+      }
+    }
+  };
+
+  // TODO: multiple input ports for join operations
+
+  public final transient DefaultOutputPort<Tuple.WindowedTuple<OutputT>> output = new DefaultOutputPort<>();
+
+  // TODO: This port should be removed when Apex Core has native support for custom control tuples
+  public final transient DefaultOutputPort<ControlTuple> controlOutput = new DefaultOutputPort<>();
+
+  /**
+   * Process the incoming data tuple
+   *
+   * @param tuple
+   */
+  public void processTuple(Tuple<InputT> tuple)
+  {
+    long timestamp = extractTimestamp(tuple);
+    if (isTooLate(timestamp)) {
+      dropTuple(tuple);
+    } else {
+      Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple);
+      // do the accumulation
+      accumulateTuple(windowedTuple);
+
+      for (Window window : windowedTuple.getWindows()) {
+        WindowState windowState = windowStateMap.get(window);
+        windowState.tupleCount++;
+        // process any count based triggers
+        if (windowState.watermarkArrivalTime == -1) {
+          // watermark has not arrived yet, check for early count based trigger
+          if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) {
+            fireTrigger(window, windowState);
+          }
+        } else {
+          // watermark has arrived, check for late count based trigger
+          if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) {
+            fireTrigger(window, windowState);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void setWindowOption(WindowOption windowOption)
+  {
+    this.windowOption = windowOption;
+    if (this.windowOption instanceof WindowOption.GlobalWindow) {
+      windowStateMap.put(Window.GLOBAL_WINDOW, new WindowState());
+    }
+  }
+
+  @Override
+  public void setTriggerOption(TriggerOption triggerOption)
+  {
+    this.triggerOption = triggerOption;
+    for (TriggerOption.Trigger trigger : triggerOption.getTriggerList()) {
+      switch (trigger.getType()) {
+        case ON_TIME:
+          triggerAtWatermark = true;
+          break;
+        case EARLY:
+          if (trigger instanceof TriggerOption.TimeTrigger) {
+            earlyTriggerMillis = ((TriggerOption.TimeTrigger)trigger).getDuration().getMillis();
+          } else if (trigger instanceof TriggerOption.CountTrigger) {
+            earlyTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount();
+          }
+          break;
+        case LATE:
+          if (trigger instanceof TriggerOption.TimeTrigger) {
+            lateTriggerMillis = ((TriggerOption.TimeTrigger)trigger).getDuration().getMillis();
+          } else if (trigger instanceof TriggerOption.CountTrigger) {
+            lateTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount();
+          }
+          break;
+        default:
+          throw new RuntimeException("Unknown trigger type: " + trigger.getType());
+      }
+    }
+  }
+
+  @Override
+  public void setAllowedLateness(Duration allowedLateness)
+  {
+    this.allowedLatenessMillis = allowedLateness.getMillis();
+  }
+
+  /**
+   * This method sets the storage for the data for each window
+   *
+   * @param storageAgent
+   */
+  public void setDataStorage(DataStorageT storageAgent)
+  {
+    this.dataStorage = storageAgent;
+  }
+
+  /**
+   * This method sets the storage for the retraction data for each window. Only used when the accumulation mode is ACCUMULATING_AND_RETRACTING
+   *
+   * @param storageAgent
+   */
+  public void setRetractionStorage(RetractionStorageT storageAgent)
+  {
+    this.retractionStorage = storageAgent;
+  }
+
+  /**
+   * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what
+   * to put in the pane when a trigger is fired
+   *
+   * @param accumulation
+   */
+  public void setAccumulation(AccumulationT accumulation)
+  {
+    this.accumulation = accumulation;
+  }
+
+  @Override
+  public void setWindowStateStorage(WindowedStorage<WindowState> storageAgent)
+  {
+    this.windowStateMap = storageAgent;
+  }
+
+  @Override
+  public void setTimestampExtractor(Function<InputT, Long> timestampExtractor)
+  {
+    this.timestampExtractor = timestampExtractor;
+  }
+
+  /**
+   * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we
+   * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally
+   * depends on the Apex window ID of this operator.
+   *
+   * Note that setting this value will make incoming watermark tuples useless.
+   */
+  public void setFixedWatermark(long millis)
+  {
+    this.fixedWatermarkMillis = millis;
+  }
+
+  public void validate() throws ValidationException
+  {
+    if (accumulation == null) {
+      throw new ValidationException("Accumulation must be set");
+    }
+    if (dataStorage == null) {
+      throw new ValidationException("Data storage must be set");
+    }
+    if (windowStateMap == null) {
+      throw new ValidationException("Window state storage must be set");
+    }
+    if (triggerOption != null) {
+      if (triggerOption.isFiringOnlyUpdatedPanes()) {
+        if (retractionStorage == null) {
+          throw new ValidationException("A retraction storage is required for firingOnlyUpdatedPanes option");
+        }
+        if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) {
+          throw new ValidationException("DISCARDING accumulation mode is not valid for firingOnlyUpdatedPanes option");
+        }
+      }
+      if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING &&
+          retractionStorage == null) {
+        throw new ValidationException("A retraction storage is required for ACCUMULATING_AND_RETRACTING accumulation mode");
+      }
+    }
+  }
+
+  @Override
+  public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input)
+  {
+    if (windowOption == null && input instanceof Tuple.WindowedTuple) {
+      // inherit the windows from upstream
+      return (Tuple.WindowedTuple<InputT>)input;
+    }
+    Tuple.WindowedTuple<InputT> windowedTuple = new Tuple.WindowedTuple<>();
+    windowedTuple.setValue(input.getValue());
+    windowedTuple.setTimestamp(extractTimestamp(input));
+    assignWindows(windowedTuple.getWindows(), input);
+    return windowedTuple;
+  }
+
+  private long extractTimestamp(Tuple<InputT> tuple)
+  {
+    if (timestampExtractor == null) {
+      if (tuple instanceof Tuple.TimestampedTuple) {
+        return ((Tuple.TimestampedTuple)tuple).getTimestamp();
+      } else {
+        return 0;
+      }
+    } else {
+      return timestampExtractor.apply(tuple.getValue());
+    }
+  }
+
+  private void assignWindows(List<Window> windows, Tuple<InputT> inputTuple)
+  {
+    if (windowOption instanceof WindowOption.GlobalWindow) {
+      windows.add(Window.GLOBAL_WINDOW);
+    } else {
+      long timestamp = extractTimestamp(inputTuple);
+      if (windowOption instanceof WindowOption.TimeWindows) {
+
+        for (Window.TimeWindow window : getTimeWindowsForTimestamp(timestamp)) {
+          if (!windowStateMap.containsWindow(window)) {
+            windowStateMap.put(window, new WindowState());
+          }
+          windows.add(window);
+        }
+      } else if (windowOption instanceof WindowOption.SessionWindows) {
+        assignSessionWindows(windows, timestamp, inputTuple);
+      }
+    }
+  }
+
+  protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<InputT> inputTuple)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the list of windows TimeWindows for the given timestamp.
+   * If we are doing sliding windows, this will return multiple windows. Otherwise, only one window will be returned.
+   * Note that this method does not apply to SessionWindows.
+   *
+   * @param timestamp
+   * @return
+   */
+  private List<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
+  {
+    List<Window.TimeWindow> windows = new ArrayList<>();
+    if (windowOption instanceof WindowOption.TimeWindows) {
+      long durationMillis = ((WindowOption.TimeWindows)windowOption).getDuration().getMillis();
+      long beginTimestamp = timestamp - timestamp % durationMillis;
+      windows.add(new Window.TimeWindow(beginTimestamp, durationMillis));
+      if (windowOption instanceof WindowOption.SlidingTimeWindows) {
+        long slideBy = ((WindowOption.SlidingTimeWindows)windowOption).getSlideByDuration().getMillis();
+        // add the sliding windows front and back
+        // Note: this messes up the order of the window and we might want to revisit this if the order of the windows
+        // matter
+        for (long slideBeginTimestamp = beginTimestamp - slideBy;
+            slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis;
+            slideBeginTimestamp -= slideBy) {
+          windows.add(new Window.TimeWindow(slideBeginTimestamp, durationMillis));
+        }
+        for (long slideBeginTimestamp = beginTimestamp + slideBy;
+            slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis;
+            slideBeginTimestamp += slideBy) {
+          windows.add(new Window.TimeWindow(slideBeginTimestamp, durationMillis));
+        }
+      }
+    } else {
+      throw new IllegalStateException("Unexpected WindowOption");
+    }
+    return windows;
+  }
+
+  @Override
+  public boolean isTooLate(long timestamp)
+  {
+    return allowedLatenessMillis < 0 ? false : (timestamp < currentWatermark - allowedLatenessMillis);
+  }
+
+  @Override
+  public void dropTuple(Tuple<InputT> input)
+  {
+    // do nothing
+    LOG.debug("Dropping late tuple {}", input);
+  }
+
+  @Override
+  public void processWatermark(ControlTuple.Watermark watermark)
+  {
+    this.watermarkTimestamp = watermark.getTimestamp();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    this.windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+    validate();
+  }
+
+  /**
+   * This is for the Apex streaming/application window. Do not confuse this with the windowing concept in this operator
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    if (currentDerivedTimestamp == -1) {
+      currentDerivedTimestamp = ((windowId >> 32) * 1000) + (windowId & 0xffffffffL);
+    } else {
+      currentDerivedTimestamp += windowWidthMillis;
+    }
+    watermarkTimestamp = -1;
+  }
+
+  /**
+   * This is for the Apex streaming/application window. Do not confuse this with the windowing concept in this operator
+   */
+  @Override
+  public void endWindow()
+  {
+    // We only do actual processing of watermark at window boundary so that it will not break idempotency.
+    // TODO: May want to revisit this if the application cares more about latency than idempotency
+    processWatermarkAtEndWindow();
+    fireTimeTriggers();
+  }
+
+  private void processWatermarkAtEndWindow()
+  {
+    if (fixedWatermarkMillis > 0) {
+      watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis;
+    }
+    if (watermarkTimestamp > 0) {
+      this.currentWatermark = watermarkTimestamp;
+
+      long horizon = watermarkTimestamp - allowedLatenessMillis;
+
+      for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.iterator(); it.hasNext(); ) {
+        Map.Entry<Window, WindowState> entry = it.next();
+        Window window = entry.getKey();
+        WindowState windowState = entry.getValue();
+        if (window.getBeginTimestamp() + window.getDurationMillis() < watermarkTimestamp) {
+          // watermark has not arrived for this window before, marking this window late
+          if (windowState.watermarkArrivalTime == -1) {
+            windowState.watermarkArrivalTime = currentDerivedTimestamp;
+            if (triggerAtWatermark) {
+              // fire trigger at watermark if applicable
+              fireTrigger(window, windowState);
+            }
+          }
+          if (allowedLatenessMillis >= 0 && window.getBeginTimestamp() + window.getDurationMillis() < horizon) {
+            // discard this window because it's too late now
+            it.remove();
+            dataStorage.remove(window);
+            retractionStorage.remove(window);
+          }
+        }
+      }
+      controlOutput.emit(new WatermarkImpl(watermarkTimestamp));
+    }
+  }
+
+  private void fireTimeTriggers()
+  {
+    if (earlyTriggerMillis > 0 || lateTriggerMillis > 0) {
+      for (Map.Entry<Window, WindowState> entry : windowStateMap.entrySet()) {
+        Window window = entry.getKey();
+        WindowState windowState = entry.getValue();
+        if (windowState.watermarkArrivalTime == -1) {
+          if (earlyTriggerMillis > 0 && windowState.lastTriggerFiredTime + earlyTriggerMillis <= currentDerivedTimestamp) {
+            // fire early time triggers
+            fireTrigger(window, windowState);
+          }
+        } else {
+          if (lateTriggerMillis > 0 && windowState.lastTriggerFiredTime + lateTriggerMillis <= currentDerivedTimestamp) {
+            // fire late time triggers
+            fireTrigger(window, windowState);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void fireTrigger(Window window, WindowState windowState)
+  {
+    if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+      fireRetractionTrigger(window);
+    }
+    fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
+    windowState.lastTriggerFiredTime = currentDerivedTimestamp;
+    if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) {
+      clearWindowData(window);
+    }
+  }
+
+  /**
+   * This method fires the normal trigger for the given window.
+   *
+   * @param window
+   * @param fireOnlyUpdatedPanes Do not fire trigger if the old value is the same as the new value. If true, retraction storage is required.
+   */
+  public abstract void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes);
+
+  /**
+   * This method fires the retraction trigger for the given window. This should only be valid if the accumulation
+   * mode is ACCUMULATING_AND_RETRACTING
+   *
+   * @param window
+   */
+  public abstract void fireRetractionTrigger(Window window);
+
+
+  @Override
+  public void clearWindowData(Window window)
+  {
+    dataStorage.remove(window);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
new file mode 100644
index 0000000..65cc595
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that
+ * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures
+ * in the near future.
+ */
+@InterfaceStability.Unstable
+public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<Map<K, V>>
+    implements WindowedKeyedStorage<K, V>, SessionWindowedStorage<K, V>
+{
+  @Override
+  public void put(Window window, K key, V value)
+  {
+    Map<K, V> kvMap;
+    if (map.containsKey(window)) {
+      kvMap = map.get(window);
+    } else {
+      kvMap = new HashMap<K, V>();
+      map.put(window, kvMap);
+    }
+    kvMap.put(key, value);
+  }
+
+  @Override
+  public Set<Map.Entry<K, V>> entrySet(Window window)
+  {
+    if (map.containsKey(window)) {
+      return map.get(window).entrySet();
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public V get(Window window, K key)
+  {
+    if (map.containsKey(window)) {
+      return map.get(window).get(key);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
+  {
+    List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
+    // TODO: this is inefficient, but this is usually not used in a real use case since it's in memory
+    for (Map.Entry<Window, Map<K, V>> entry : map.entrySet()) {
+      Window.SessionWindow<K> window = (Window.SessionWindow<K>)entry.getKey();
+      if (key.equals(window.getKey())) {
+        if (timestamp > window.getBeginTimestamp()) {
+          if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) {
+            results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
+          }
+        } else if (timestamp < window.getBeginTimestamp()) {
+          if (window.getBeginTimestamp() - gap <= timestamp) {
+            results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
+          }
+        } else {
+          results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
+        }
+      }
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
new file mode 100644
index 0000000..a78a327
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the in-memory implementation of {@link WindowedStorage}. Do not use this class if you have a large state that
+ * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data
+ * structures in the near future.
+ */
+@InterfaceStability.Unstable
+public class InMemoryWindowedStorage<T> implements WindowedStorage<T>
+{
+  protected final TreeMap<Window, T> map = new TreeMap<>(Window.DEFAULT_COMPARATOR);
+
+  @Override
+  public long size()
+  {
+    return map.size();
+  }
+
+  @Override
+  public void put(Window window, T value)
+  {
+    map.put(window, value);
+  }
+
+  @Override
+  public boolean containsWindow(Window window)
+  {
+    return map.containsKey(window);
+  }
+
+  @Override
+  public T get(Window window)
+  {
+    return map.get(window);
+  }
+
+  @Override
+  public void remove(Window window)
+  {
+    map.remove(window);
+  }
+
+  @Override
+  public void migrateWindow(Window fromWindow, Window toWindow)
+  {
+    if (containsWindow(fromWindow)) {
+      map.put(toWindow, map.remove(fromWindow));
+    }
+  }
+
+  @Override
+  public Iterable<Map.Entry<Window, T>> entrySet()
+  {
+    return map.entrySet();
+  }
+
+  @Override
+  public Iterator<Map.Entry<Window, T>> iterator()
+  {
+    return map.entrySet().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
new file mode 100644
index 0000000..2b71fc0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is an implementation of WindowedOperator that takes in key value pairs as input and gives out key value pairs
+ * as output. If your operation is not key based, please use {@link WindowedOperatorImpl}.
+ *
+ * @param <KeyT> The type of the key of both the input and the output tuple
+ * @param <InputValT> The type of the value of the keyed input tuple
+ * @param <AccumT> The type of the accumulated value in the operator state per key per window
+ * @param <OutputValT> The type of the value of the keyed output tuple
+ */
+@InterfaceStability.Evolving
+public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
+    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedKeyedStorage<KeyT, AccumT>, WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
+{
+
+  @Override
+  protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
+  {
+    KeyT key = inputTuple.getValue().getKey();
+    WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
+    SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage;
+    Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis());
+    Window.SessionWindow<KeyT> sessionWindowToAssign;
+    switch (sessionEntries.size()) {
+      case 0: {
+        // There are no existing windows within the minimum gap. Create a new session window
+        Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1);
+        windowStateMap.put(sessionWindow, new WindowState());
+        sessionWindowToAssign = sessionWindow;
+        break;
+      }
+      case 1: {
+        // There is already one existing window within the minimum gap. See whether we need to extend the time of that window
+        Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next();
+        Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
+        if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
+          // The session window already covers the event
+          sessionWindowToAssign = sessionWindow;
+        } else {
+          // The session window does not cover the event but is within the min gap
+          if (triggerOption != null &&
+              triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+            // fire a retraction trigger because the session window will be enlarged
+            fireRetractionTrigger(sessionWindow);
+          }
+          // create a new session window that covers the timestamp
+          long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
+          long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1);
+          Window.SessionWindow<KeyT> newSessionWindow =
+              new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
+          windowStateMap.remove(sessionWindow);
+          sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
+          windowStateMap.put(newSessionWindow, new WindowState());
+          sessionWindowToAssign = newSessionWindow;
+        }
+        break;
+      }
+      case 2: {
+        // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows
+        Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator();
+        Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next();
+        Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next();
+        Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey();
+        Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey();
+        AccumT sessionData1 = sessionWindowEntry1.getValue();
+        AccumT sessionData2 = sessionWindowEntry2.getValue();
+        if (triggerOption != null &&
+            triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+          // fire a retraction trigger because the two session windows will be merged to a new window
+          fireRetractionTrigger(sessionWindow1);
+          fireRetractionTrigger(sessionWindow2);
+        }
+        long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp());
+        long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(),
+            sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis());
+
+        Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
+        AccumT newSessionData = accumulation.merge(sessionData1, sessionData2);
+        sessionStorage.remove(sessionWindow1);
+        sessionStorage.remove(sessionWindow2);
+        sessionStorage.put(newSessionWindow, key, newSessionData);
+        windowStateMap.remove(sessionWindow1);
+        windowStateMap.remove(sessionWindow2);
+        windowStateMap.put(newSessionWindow, new WindowState());
+        sessionWindowToAssign = newSessionWindow;
+        break;
+      }
+      default:
+        throw new IllegalStateException("There are more than two sessions matching one timestamp");
+    }
+    windows.add(sessionWindowToAssign);
+  }
+
+  @Override
+  public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputValT>> tuple)
+  {
+    KeyValPair<KeyT, InputValT> kvData = tuple.getValue();
+    KeyT key = kvData.getKey();
+    for (Window window : tuple.getWindows()) {
+      // process each window
+      AccumT accum = dataStorage.get(window, key);
+      if (accum == null) {
+        accum = accumulation.defaultAccumulatedValue();
+      }
+      dataStorage.put(window, key, accumulation.accumulate(accum, kvData.getValue()));
+    }
+  }
+
+  @Override
+  public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+  {
+    for (Map.Entry<KeyT, AccumT> entry : dataStorage.entrySet(window)) {
+      OutputValT outputVal = accumulation.getOutput(entry.getValue());
+      if (fireOnlyUpdatedPanes) {
+        OutputValT oldValue = retractionStorage.get(window, entry.getKey());
+        if (oldValue != null && oldValue.equals(outputVal)) {
+          continue;
+        }
+      }
+      output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
+      if (retractionStorage != null) {
+        retractionStorage.put(window, entry.getKey(), outputVal);
+      }
+    }
+  }
+
+  @Override
+  public void fireRetractionTrigger(Window window)
+  {
+    if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+      throw new UnsupportedOperationException();
+    }
+    for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entrySet(window)) {
+      output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WatermarkImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WatermarkImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WatermarkImpl.java
new file mode 100644
index 0000000..7dcd527
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WatermarkImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * The implementation of the Watermark tuple
+ */
+@InterfaceStability.Evolving
+public class WatermarkImpl implements ControlTuple.Watermark
+{
+  private long timestamp;
+
+  private WatermarkImpl()
+  {
+    // for kryo
+  }
+
+  public WatermarkImpl(long timestamp)
+  {
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public long getTimestamp()
+  {
+    return timestamp;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "[Watermark " + getTimestamp() + "]";
+  }
+}