You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/27 18:09:11 UTC

apex-malhar git commit: APEXMALHAR-2130 Spillable implementation for WindowedOperator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 763d14fca -> c19c80d88


APEXMALHAR-2130 Spillable implementation for WindowedOperator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c19c80d8
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c19c80d8
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c19c80d8

Branch: refs/heads/master
Commit: c19c80d8882fd530926093d8ce6b81c0503febc3
Parents: 763d14f
Author: David Yan <da...@datatorrent.com>
Authored: Mon Aug 15 14:19:08 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 27 09:47:49 2016 -0700

----------------------------------------------------------------------
 .../spillable/SpillableSetMultimapImpl.java     |   8 +-
 .../apex/malhar/lib/window/Accumulation.java    |  22 +-
 .../apex/malhar/lib/window/ControlTuple.java    |   2 +-
 .../lib/window/SessionWindowedStorage.java      |   8 +-
 .../apex/malhar/lib/window/TriggerOption.java   |  32 +--
 .../apache/apex/malhar/lib/window/Tuple.java    |   5 +-
 .../apache/apex/malhar/lib/window/Window.java   |   5 +-
 .../apex/malhar/lib/window/WindowOption.java    |   2 +-
 .../malhar/lib/window/WindowedOperator.java     |  27 +-
 .../apex/malhar/lib/window/WindowedStorage.java |  32 +--
 .../window/impl/AbstractWindowedOperator.java   |  27 +-
 .../impl/InMemorySessionWindowedStorage.java    |   4 +-
 .../impl/SpillableSessionWindowedStorage.java   | 123 +++++++++
 .../impl/SpillableWindowedKeyedStorage.java     | 224 ++++++++++++++++
 .../impl/SpillableWindowedPlainStorage.java     | 146 +++++++++++
 .../lib/helper/OperatorContextTestHelper.java   |   5 +
 .../window/SpillableWindowedStorageTest.java    | 150 +++++++++++
 .../malhar/lib/window/WindowedOperatorTest.java | 261 ++++++++++++-------
 18 files changed, 904 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 951ef76..c227ed7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -167,7 +167,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     if (spillableSet != null) {
       cache.remove((K)key);
       Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
-      map.remove(keySlice);
+      map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
       spillableSet.clear();
       removedSets.add(spillableSet);
     }
@@ -200,7 +200,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
       return true;
     }
     Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
-    return map.containsKey(keySlice);
+    Pair<Integer, V> meta = map.get(keySlice);
+    return meta != null && meta.getLeft() > 0;
   }
 
   @Override
@@ -230,8 +231,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
       spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
       cache.put(key, spillableSet);
     }
-    spillableSet.add(value);
-    return true;
+    return spillableSet.add(value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
index 03f7ff7..44814d0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
@@ -37,33 +37,33 @@ public interface Accumulation<InputT, AccumT, OutputT>
   /**
    * Returns the default accumulated value when nothing has been accumulated
    *
-   * @return
+   * @return the default accumulated value
    */
   AccumT defaultAccumulatedValue();
 
   /**
    * Accumulates the input to the accumulated value
    *
-   * @param accumulatedValue
-   * @param input
-   * @return
+   * @param accumulatedValue the accumulated value
+   * @param input the input value
+   * @return the result accumulated value
    */
   AccumT accumulate(AccumT accumulatedValue, InputT input);
 
   /**
    * Merges two accumulated values into one
    *
-   * @param accumulatedValue1
-   * @param accumulatedValue2
-   * @return
+   * @param accumulatedValue1 the first accumulated value
+   * @param accumulatedValue2 the second accumulated value
+   * @return the result accumulated value
    */
   AccumT merge(AccumT accumulatedValue1, AccumT accumulatedValue2);
 
   /**
    * Gets the output of the accumulated value. This is used for generating the data for triggers
    *
-   * @param accumulatedValue
-   * @return
+   * @param accumulatedValue the accumulated value
+   * @return the output
    */
   OutputT getOutput(AccumT accumulatedValue);
 
@@ -71,8 +71,8 @@ public interface Accumulation<InputT, AccumT, OutputT>
    * Gets the retraction of the value. This is used for retracting previous panes in
    * ACCUMULATING_AND_RETRACTING accumulation mode
    *
-   * @param value
-   * @return
+   * @param value the value to be retracted
+   * @return the retracted value
    */
   OutputT getRetraction(OutputT value);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
index 3288398..69093e5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
@@ -37,7 +37,7 @@ public interface ControlTuple
     /**
      * Gets the timestamp associated with this watermark
      *
-     * @return
+     * @return the timestamp
      */
     long getTimestamp();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
index 4cb2b1a..3e25d15 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -38,8 +38,8 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe
    * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
    * data to toWindow, and overwrite any existing data in toWindow
    *
-   * @param fromWindow
-   * @param toWindow
+   * @param fromWindow the window we want to migrate from
+   * @param toWindow the window we want to migrate to
    */
   void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow);
 
@@ -51,8 +51,8 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe
    *
    * @param key the key
    * @param timestamp the timestamp
-   * @param gap
-   * @return
+   * @param gap the minimum gap
+   * @return the windows
    */
   Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
index 266577f..bd9cd9c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
@@ -138,7 +138,7 @@ public class TriggerOption
   /**
    * Creates a TriggerOption with an initial trigger that should be fired at the watermark
    *
-   * @return
+   * @return the TriggerOption
    */
   public static TriggerOption AtWatermark()
   {
@@ -151,8 +151,8 @@ public class TriggerOption
   /**
    * A trigger should be fired before the watermark once for every specified duration
    *
-   * @param duration
-   * @return
+   * @param duration the duration
+   * @return the TriggerOption
    */
   public TriggerOption withEarlyFiringsAtEvery(Duration duration)
   {
@@ -164,8 +164,8 @@ public class TriggerOption
   /**
    * A trigger should be fired before the watermark once for every n tuple(s)
    *
-   * @param count
-   * @return
+   * @param count the count
+   * @return the TriggerOption
    */
   public TriggerOption withEarlyFiringsAtEvery(long count)
   {
@@ -177,8 +177,8 @@ public class TriggerOption
   /**
    * A trigger should be fired after the watermark once for every specified duration
    *
-   * @param duration
-   * @return
+   * @param duration the duration
+   * @return the TriggerOption
    */
   public TriggerOption withLateFiringsAtEvery(Duration duration)
   {
@@ -190,8 +190,8 @@ public class TriggerOption
   /**
    * A trigger should be fired after the watermark once for every n late tuple(s)
    *
-   * @param count
-   * @return
+   * @param count the count
+   * @return the TriggerOption
    */
   public TriggerOption withLateFiringsAtEvery(long count)
   {
@@ -203,7 +203,7 @@ public class TriggerOption
   /**
    * With discarding mode, the state is discarded after each trigger
    *
-   * @return
+   * @return the TriggerOption
    */
   public TriggerOption discardingFiredPanes()
   {
@@ -214,7 +214,7 @@ public class TriggerOption
   /**
    * With accumulating mode, the state is kept
    *
-   * @return
+   * @return the TriggerOption
    */
   public TriggerOption accumulatingFiredPanes()
   {
@@ -227,7 +227,7 @@ public class TriggerOption
    * so when new values come in that change the state, a retraction trigger can be fired with the snapshot of the state
    * when the last trigger was fired
    *
-   * @return
+   * @return the TriggerOption
    */
   public TriggerOption accumulatingAndRetractingFiredPanes()
   {
@@ -239,7 +239,7 @@ public class TriggerOption
    * Only fire triggers for data that has changed from the last trigger. This only applies to ACCUMULATING and
    * ACCUMULATING_AND_RETRACTING accumulation modes.
    *
-   * @return
+   * @return the TriggerOption
    */
   public TriggerOption firingOnlyUpdatedPanes()
   {
@@ -250,7 +250,7 @@ public class TriggerOption
   /**
    * Gets the accumulation mode
    *
-   * @return
+   * @return the AccumulationMode
    */
   public AccumulationMode getAccumulationMode()
   {
@@ -260,7 +260,7 @@ public class TriggerOption
   /**
    * Gets the trigger list
    *
-   * @return
+   * @return the trigger list
    */
   public List<Trigger> getTriggerList()
   {
@@ -271,7 +271,7 @@ public class TriggerOption
    * Returns whether we should only fire panes that have been updated since the last trigger.
    * When this option is set, DISCARDING accumulation mode must not be used.
    *
-   * @return
+   * @return whether we want to fire only updated panes
    */
   public boolean isFiringOnlyUpdatedPanes()
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
index aea6bf6..c7eba4e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
@@ -35,7 +35,7 @@ public interface Tuple<T>
   /**
    * Gets the value of the tuple
    *
-   * @return
+   * @return the value
    */
   T getValue();
 
@@ -81,6 +81,7 @@ public interface Tuple<T>
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean equals(Object obj)
     {
       if (obj instanceof PlainTuple) {
@@ -133,6 +134,7 @@ public interface Tuple<T>
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean equals(Object obj)
     {
       if (obj instanceof TimestampedTuple && super.equals(obj)) {
@@ -186,6 +188,7 @@ public interface Tuple<T>
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean equals(Object obj)
     {
       if (obj instanceof WindowedTuple && super.equals(obj)) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
index 50d6445..1d7681d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -105,7 +105,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
     /**
      * Gets the beginning timestamp of this window
      *
-     * @return
+     * @return the begin timestamp
      */
     @Override
     public long getBeginTimestamp()
@@ -116,7 +116,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
     /**
      * Gets the duration millis of this window
      *
-     * @return
+     * @return the duration
      */
     @Override
     public long getDurationMillis()
@@ -198,6 +198,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
         return false;
       }
       if (other instanceof SessionWindow) {
+        @SuppressWarnings("unchecked")
         SessionWindow<K> otherSessionWindow = (SessionWindow<K>)other;
         if (key == null) {
           return otherSessionWindow.key == null;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
index 099709d..a88250e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
@@ -73,7 +73,7 @@ public interface WindowOption
     /**
      * The time window should be a sliding window with the given slide duration
      *
-     * @param duration
+     * @param duration the slide by duration
      * @return the SlidingTimeWindows
      */
     public SlidingTimeWindows slideBy(Duration duration)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
index ccc7ae1..400a97b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
@@ -42,36 +42,36 @@ public interface WindowedOperator<InputT>
   /**
    * Sets the WindowOption of this operator
    *
-   * @param windowOption
+   * @param windowOption the window option
    */
   void setWindowOption(WindowOption windowOption);
 
   /**
    * Sets the TriggerOption of this operator
    *
-   * @param triggerOption
+   * @param triggerOption the trigger option
    */
   void setTriggerOption(TriggerOption triggerOption);
 
   /**
    * Sets the allowed lateness of this operator
    *
-   * @param allowedLateness
+   * @param allowedLateness the allowed lateness
    */
   void setAllowedLateness(Duration allowedLateness);
 
   /**
    * This sets the function that extracts the timestamp from the input tuple
    *
-   * @param timestampExtractor
+   * @param timestampExtractor the timestamp extractor
    */
   void setTimestampExtractor(Function<InputT, Long> timestampExtractor);
 
   /**
    * Assign window(s) for this input tuple
    *
-   * @param input
-   * @return
+   * @param input the input tuple
+   * @return the windowed tuple
    */
   Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input);
 
@@ -80,8 +80,8 @@ public interface WindowedOperator<InputT>
    * The implementation of this operator should look at the allowed lateness in the WindowOption.
    * It should also call this function and if it returns true, it should drop the associated tuple.
    *
-   * @param timestamp
-   * @return
+   * @param timestamp the timestamp
+   * @return whether the timestamp is considered too late
    */
   boolean isTooLate(long timestamp);
 
@@ -89,14 +89,14 @@ public interface WindowedOperator<InputT>
    * This method is supposed to drop the tuple because it has passed the allowed lateness. But an implementation
    * of this method has the chance to do something different (e.g. emit it to another port)
    *
-   * @param input
+   * @param input the input tuple
    */
   void dropTuple(Tuple<InputT> input);
 
   /**
    * This method accumulates the incoming tuple (with the Accumulation interface)
    *
-   * @param tuple
+   * @param tuple the input tuple
    */
   void accumulateTuple(Tuple.WindowedTuple<InputT> tuple);
 
@@ -106,7 +106,7 @@ public interface WindowedOperator<InputT>
    * and change the state of each of those windows. All tuples for those windows arriving after
    * the watermark will be considered late.
    *
-   * @param watermark
+   * @param watermark the watermark tuple
    */
   void processWatermark(ControlTuple.Watermark watermark);
 
@@ -114,14 +114,15 @@ public interface WindowedOperator<InputT>
    * This method fires the trigger for the given window, and possibly retraction trigger. The implementation should clear
    * the window data in the storage if the accumulation mode is DISCARDING
    *
-   * @param window
+   * @param window the window
+   * @param windowState the window state
    */
   void fireTrigger(Window window, WindowState windowState);
 
   /**
    * This method clears the window data in the storage.
    *
-   * @param window
+   * @param window the window
    */
   void clearWindowData(Window window);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
index 42ecdae..e2874ba 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -37,21 +37,21 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
   /**
    * Returns true if the storage contains this window
    *
-   * @param window
+   * @param window the window
    */
   boolean containsWindow(Window window);
 
   /**
    * Returns the number of windows in the storage
    *
-   * @return
+   * @return the number of windows
    */
   long size();
 
   /**
    * Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state
    *
-   * @param window
+   * @param window the window
    */
   void remove(Window window);
 
@@ -68,23 +68,23 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
     /**
      * Sets the data associated with the given window
      *
-     * @param window
-     * @param value
+     * @param window the window
+     * @param value the value
      */
     void put(Window window, T value);
 
     /**
      * Gets the value associated with the given window
      *
-     * @param window
-     * @return
+     * @param window the window
+     * @return the value
      */
     T get(Window window);
 
     /**
      * Returns the iterable of the entries in the storage
      *
-     * @return
+     * @return the entries
      */
     Iterable<Map.Entry<Window, T>> entries();
   }
@@ -100,26 +100,26 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
     /**
      * Sets the data associated with the given window and the key
      *
-     * @param window
-     * @param key
-     * @param value
+     * @param window the window
+     * @param key the key
+     * @param value the value
      */
     void put(Window window, K key, V value);
 
     /**
      * Gets an iterable object over the key/value pairs associated with the given window
      *
-     * @param window
-     * @return
+     * @param window the window
+     * @return the entries
      */
     Iterable<Map.Entry<K, V>> entries(Window window);
 
     /**
      * Gets the data associated with the given window and the key
      *
-     * @param window
-     * @param key
-     * @return
+     * @param window the window
+     * @param key the key
+     * @return the value
      */
     V get(Window window, K key);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index f90d47d..c778523 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   /**
    * Process the incoming data tuple
    *
-   * @param tuple
+   * @param tuple the incoming tuple
    */
   public void processTuple(Tuple<InputT> tuple)
   {
@@ -206,21 +206,21 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   /**
    * This method sets the storage for the data for each window
    *
-   * @param storageAgent
+   * @param dataStorage The data storage
    */
-  public void setDataStorage(DataStorageT storageAgent)
+  public void setDataStorage(DataStorageT dataStorage)
   {
-    this.dataStorage = storageAgent;
+    this.dataStorage = dataStorage;
   }
 
   /**
    * This method sets the storage for the retraction data for each window. Only used when the accumulation mode is ACCUMULATING_AND_RETRACTING
    *
-   * @param storageAgent
+   * @param retractionStorage The retraction storage
    */
-  public void setRetractionStorage(RetractionStorageT storageAgent)
+  public void setRetractionStorage(RetractionStorageT retractionStorage)
   {
-    this.retractionStorage = storageAgent;
+    this.retractionStorage = retractionStorage;
   }
 
   public void addComponent(String key, Component<Context.OperatorContext> component)
@@ -232,7 +232,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
    * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what
    * to put in the pane when a trigger is fired
    *
-   * @param accumulation
+   * @param accumulation the accumulation
    */
   public void setAccumulation(AccumulationT accumulation)
   {
@@ -345,8 +345,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
    * If we are doing sliding windows, this will return multiple windows. Otherwise, only one window will be returned.
    * Note that this method does not apply to SessionWindows.
    *
-   * @param timestamp
-   * @return
+   * @param timestamp the timestamp
+   * @return the windows this timestamp belongs to
    */
   private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
   {
@@ -378,7 +378,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   @Override
   public boolean isTooLate(long timestamp)
   {
-    return allowedLatenessMillis < 0 ? false : (timestamp < currentWatermark - allowedLatenessMillis);
+    return allowedLatenessMillis >= 0 && (timestamp < currentWatermark - allowedLatenessMillis);
   }
 
   @Override
@@ -395,6 +395,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void setup(Context.OperatorContext context)
   {
     this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
@@ -537,7 +538,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   /**
    * This method fires the normal trigger for the given window.
    *
-   * @param window
+   * @param window the window to fire trigger on
    * @param fireOnlyUpdatedPanes Do not fire trigger if the old value is the same as the new value. If true, retraction storage is required.
    */
   public abstract void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes);
@@ -546,7 +547,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
    * This method fires the retraction trigger for the given window. This should only be valid if the accumulation
    * mode is ACCUMULATING_AND_RETRACTING
    *
-   * @param window
+   * @param window the window to fire the retraction trigger on
    */
   public abstract void fireRetractionTrigger(Window window);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
index fdceb4d..906b1b9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -45,13 +45,15 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS
   @Override
   public void put(Window window, K key, V value)
   {
+    @SuppressWarnings("unchecked")
+    Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
     super.put(window, key, value);
     TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
     if (sessionWindows == null) {
       sessionWindows = new TreeSet<>();
       keyToWindows.put(key, sessionWindows);
     }
-    sessionWindows.add((Window.SessionWindow<K>)window);
+    sessionWindows.add(sessionWindow);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
new file mode 100644
index 0000000..8779739
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Spillable session windowed storage.
+ */
+@InterfaceStability.Evolving
+public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeyedStorage<K, V> implements SessionWindowedStorage<K, V>
+{
+  // additional key to windows map for fast lookup of windows using key
+  private Spillable.SpillableSetMultimap<K, Window.SessionWindow<K>> keyToWindowsMap;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    if (keyToWindowsMap == null) {
+      // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on.
+      // This is logged in APEXMALHAR-2271
+      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void remove(Window window)
+  {
+    super.remove(window);
+    Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
+    keyToWindowsMap.remove(sessionWindow.getKey(), sessionWindow);
+  }
+
+  @Override
+  public void put(Window window, K key, V value)
+  {
+    super.put(window, key, value);
+    @SuppressWarnings("unchecked")
+    Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
+    if (!keyToWindowsMap.containsEntry(key, sessionWindow)) {
+      keyToWindowsMap.put(key, sessionWindow);
+    }
+  }
+
+  @Override
+  public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow)
+  {
+    Set<K> keys = windowToKeysMap.get(fromWindow);
+    if (keys == null) {
+      return;
+    }
+    windowKeyToValueMap.remove(toWindow);
+    for (K key : keys) {
+      windowToKeysMap.put(toWindow, key);
+      ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key);
+      ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key);
+
+      V value = windowKeyToValueMap.get(oldKey);
+      windowKeyToValueMap.remove(oldKey);
+      windowKeyToValueMap.put(newKey, value);
+      keyToWindowsMap.remove(key, fromWindow);
+      keyToWindowsMap.put(key, toWindow);
+    }
+    windowToKeysMap.removeAll(fromWindow);
+  }
+
+  @Override
+  public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
+  {
+    List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
+    Set<Window.SessionWindow<K>> sessionWindows = keyToWindowsMap.get(key);
+    if (sessionWindows != null) {
+      for (Window.SessionWindow<K> window : sessionWindows) {
+        if (timestamp > window.getBeginTimestamp()) {
+          if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) {
+            results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
+          }
+        } else if (timestamp < window.getBeginTimestamp()) {
+          if (window.getBeginTimestamp() - gap <= timestamp) {
+            results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
+          }
+        } else {
+          results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
+        }
+      }
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
new file mode 100644
index 0000000..ac77d1b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Implementation of WindowedKeyedStorage using {@link Spillable} data structures
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
+{
+  @NotNull
+  protected SpillableComplexComponent scc;
+  protected long bucket;
+  protected Serde<Window, Slice> windowSerde;
+  protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde;
+  protected Serde<K, Slice> keySerde;
+  protected Serde<V, Slice> valueSerde;
+
+  protected Spillable.SpillableByteMap<Pair<Window, K>, V> windowKeyToValueMap;
+  protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
+
+  private class KVIterator implements Iterator<Map.Entry<K, V>>
+  {
+    final Window window;
+    final Set<K> keys;
+    Iterator<K> iterator;
+
+    KVIterator(Window window)
+    {
+      this.window = window;
+      this.keys = windowToKeysMap.get(window);
+      if (this.keys != null) {
+        this.iterator = this.keys.iterator();
+      }
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return iterator != null && iterator.hasNext();
+    }
+
+    @Override
+    public Map.Entry<K, V> next()
+    {
+      K key = iterator.next();
+      return new AbstractMap.SimpleEntry<>(key, windowKeyToValueMap.get(new ImmutablePair<>(window, key)));
+    }
+
+    @Override
+    public void remove()
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public SpillableWindowedKeyedStorage()
+  {
+  }
+
+  public SpillableWindowedKeyedStorage(long bucket,
+      Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
+  {
+    this.bucket = bucket;
+    this.windowSerde = windowSerde;
+    this.windowKeyPairSerde = windowKeyPairSerde;
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+  }
+
+  public void setSpillableComplexComponent(SpillableComplexComponent scc)
+  {
+    this.scc = scc;
+  }
+
+  public SpillableComplexComponent getSpillableComplexComponent()
+  {
+    return this.scc;
+  }
+
+  public void setBucket(long bucket)
+  {
+    this.bucket = bucket;
+  }
+
+  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  {
+    this.windowSerde = windowSerde;
+  }
+
+  public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> windowKeyPairSerde)
+  {
+    this.windowKeyPairSerde = windowKeyPairSerde;
+  }
+
+  public void setValueSerde(Serde<V, Slice> valueSerde)
+  {
+    this.valueSerde = valueSerde;
+  }
+
+  @Override
+  public boolean containsWindow(Window window)
+  {
+    return windowToKeysMap.containsKey(window);
+  }
+
+  @Override
+  public long size()
+  {
+    return windowToKeysMap.size();
+  }
+
+  @Override
+  public void remove(Window window)
+  {
+    Set<K> keys = windowToKeysMap.get(window);
+    if (keys != null) {
+      for (K key : keys) {
+        windowKeyToValueMap.remove(new ImmutablePair<>(window, key));
+      }
+    }
+    windowToKeysMap.removeAll(window);
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (bucket == 0) {
+      // choose a bucket that is guaranteed to be unique in Apex
+      bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
+    }
+    // set default serdes
+    if (windowSerde == null) {
+      windowSerde = new SerdeKryoSlice<>();
+    }
+    if (windowKeyPairSerde == null) {
+      windowKeyPairSerde = new SerdeKryoSlice<>();
+    }
+    if (keySerde == null) {
+      keySerde = new SerdeKryoSlice<>();
+    }
+    if (valueSerde == null) {
+      valueSerde = new SerdeKryoSlice<>();
+    }
+
+    if (windowKeyToValueMap == null) {
+      windowKeyToValueMap = scc.newSpillableByteMap(bucket, windowKeyPairSerde, valueSerde);
+    }
+    if (windowToKeysMap == null) {
+      windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void put(Window window, K key, V value)
+  {
+    if (!windowToKeysMap.containsEntry(window, key)) {
+      windowToKeysMap.put(window, key);
+    }
+    windowKeyToValueMap.put(new ImmutablePair<>(window, key), value);
+  }
+
+  @Override
+  public Iterable<Map.Entry<K, V>> entries(final Window window)
+  {
+    return new Iterable<Map.Entry<K, V>>()
+    {
+      @Override
+      public Iterator<Map.Entry<K, V>> iterator()
+      {
+        return new KVIterator(window);
+      }
+    };
+  }
+
+  @Override
+  public V get(Window window, K key)
+  {
+    return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
new file mode 100644
index 0000000..81f5dbb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures
+ *
+ * @param <T> Type of the value per window
+ */
+public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
+{
+  @NotNull
+  private SpillableComplexComponent scc;
+  private long bucket;
+  private Serde<Window, Slice> windowSerde;
+  private Serde<T, Slice> valueSerde;
+
+  protected Spillable.SpillableByteMap<Window, T> windowToDataMap;
+
+  public SpillableWindowedPlainStorage()
+  {
+  }
+
+  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
+  {
+    this.bucket = bucket;
+    this.windowSerde = windowSerde;
+    this.valueSerde = valueSerde;
+  }
+
+  public void setSpillableComplexComponent(SpillableComplexComponent scc)
+  {
+    this.scc = scc;
+  }
+
+  public SpillableComplexComponent getSpillableComplexComponent()
+  {
+    return scc;
+  }
+
+  public void setBucket(long bucket)
+  {
+    this.bucket = bucket;
+  }
+
+  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  {
+    this.windowSerde = windowSerde;
+  }
+
+  public void setValueSerde(Serde<T, Slice> valueSerde)
+  {
+    this.valueSerde = valueSerde;
+  }
+
+  @Override
+  public void put(Window window, T value)
+  {
+    windowToDataMap.put(window, value);
+  }
+
+  @Override
+  public T get(Window window)
+  {
+    return windowToDataMap.get(window);
+  }
+
+  @Override
+  public Iterable<Map.Entry<Window, T>> entries()
+  {
+    return windowToDataMap.entrySet();
+  }
+
+  @Override
+  public boolean containsWindow(Window window)
+  {
+    return windowToDataMap.containsKey(window);
+  }
+
+  @Override
+  public long size()
+  {
+    return windowToDataMap.size();
+  }
+
+  @Override
+  public void remove(Window window)
+  {
+    windowToDataMap.remove(window);
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (bucket == 0) {
+      // choose a bucket that is almost guaranteed to be unique
+      bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
+    }
+    // set default serdes
+    if (windowSerde == null) {
+      windowSerde = new SerdeKryoSlice<>();
+    }
+    if (valueSerde == null) {
+      valueSerde = new SerdeKryoSlice<>();
+    }
+    if (windowToDataMap == null) {
+      windowToDataMap = scc.newSpillableByteMap(bucket, windowSerde, valueSerde);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
index 2ece6b2..8fa814d 100644
--- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
+++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
@@ -60,6 +60,11 @@ public class OperatorContextTestHelper
       this.attributes = map;
     }
 
+    public com.datatorrent.api.Attribute.AttributeMap getAttributes()
+    {
+      return attributes;
+    }
+
     @Override
     public int getId()
     {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
new file mode 100644
index 0000000..3b7789c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * Unit tests for Spillable Windowed Storage
+ */
+public class SpillableWindowedStorageTest
+{
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void testWindowedPlainStorage()
+  {
+    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+    SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
+    Window window1 = new Window.TimeWindow<>(1000, 10);
+    Window window2 = new Window.TimeWindow<>(1010, 10);
+    Window window3 = new Window.TimeWindow<>(1020, 10);
+    storage.setSpillableComplexComponent(sccImpl);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+    storage.setup(testMeta.operatorContext);
+
+    sccImpl.beginWindow(1000);
+    storage.put(window1, 1);
+    storage.put(window2, 2);
+    storage.put(window3, 3);
+    sccImpl.endWindow();
+    sccImpl.beginWindow(1001);
+    storage.put(window1, 4);
+    storage.put(window2, 5);
+    sccImpl.endWindow();
+    sccImpl.beforeCheckpoint(1001);
+    SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
+    sccImpl.checkpointed(1001);
+
+
+    sccImpl.beginWindow(1002);
+    storage.put(window1, 6);
+    storage.put(window2, 7);
+    sccImpl.endWindow();
+
+    Assert.assertEquals(6L, storage.get(window1).longValue());
+    Assert.assertEquals(7L, storage.get(window2).longValue());
+    Assert.assertEquals(3L, storage.get(window3).longValue());
+
+    sccImpl.beginWindow(1003);
+    storage.put(window1, 8);
+    storage.put(window2, 9);
+    sccImpl.endWindow();
+
+    // simulating crash here
+    storage.teardown();
+    storage.getSpillableComplexComponent().teardown();
+
+    storage = clonedStorage;
+    testMeta.operatorContext.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1001L);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+    storage.setup(testMeta.operatorContext);
+
+    // recovery at window 1002
+    sccImpl.beginWindow(1002);
+    Assert.assertEquals(4L, storage.get(window1).longValue());
+    Assert.assertEquals(5L, storage.get(window2).longValue());
+    Assert.assertEquals(3L, storage.get(window3).longValue());
+  }
+
+  @Test
+  public void testWindowedKeyedStorage()
+  {
+    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+    SpillableWindowedKeyedStorage<String, Integer> storage = new SpillableWindowedKeyedStorage<>();
+    Window window1 = new Window.TimeWindow<>(1000, 10);
+    Window window2 = new Window.TimeWindow<>(1010, 10);
+    Window window3 = new Window.TimeWindow<>(1020, 10);
+    storage.setSpillableComplexComponent(sccImpl);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+    storage.setup(testMeta.operatorContext);
+
+    sccImpl.beginWindow(1000);
+    storage.put(window1, "x", 1);
+    storage.put(window2, "x", 2);
+    storage.put(window3, "x", 3);
+    sccImpl.endWindow();
+    sccImpl.beginWindow(1001);
+    storage.put(window1, "x", 4);
+    storage.put(window2, "x", 5);
+    sccImpl.endWindow();
+    sccImpl.beforeCheckpoint(1001);
+    SpillableWindowedKeyedStorage<String, Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
+    sccImpl.checkpointed(1001);
+
+    sccImpl.beginWindow(1002);
+    storage.put(window1, "x", 6);
+    storage.put(window2, "x", 7);
+    storage.put(window2, "y", 8);
+    sccImpl.endWindow();
+
+    Assert.assertEquals(6L, storage.get(window1, "x").longValue());
+    Assert.assertEquals(7L, storage.get(window2, "x").longValue());
+    Assert.assertEquals(3L, storage.get(window3, "x").longValue());
+    Assert.assertEquals(8L, storage.get(window2, "y").longValue());
+
+    // simulating crash here
+    storage.teardown();
+    storage.getSpillableComplexComponent().teardown();
+
+    storage = clonedStorage;
+    testMeta.operatorContext.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1001L);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+    storage.setup(testMeta.operatorContext);
+
+    // recovery at window 1002
+    sccImpl.beginWindow(1002);
+    Assert.assertEquals(4L, storage.get(window1, "x").longValue());
+    Assert.assertEquals(5L, storage.get(window2, "x").longValue());
+    Assert.assertEquals(3L, storage.get(window3, "x").longValue());
+    Assert.assertNull(storage.get(window2, "y"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 7396994..4edbcd0 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -18,35 +18,64 @@
  */
 package org.apache.apex.malhar.lib.window;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.TreeMap;
 
 import javax.validation.ValidationException;
 
 import org.joda.time.Duration;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
 import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.SpillableSessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
 import org.apache.apex.malhar.lib.window.impl.WatermarkImpl;
 import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
 import org.apache.commons.lang3.mutable.MutableLong;
 
-import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Sink;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * Unit tests for WindowedOperator
  */
+@RunWith(Parameterized.class)
 public class WindowedOperatorTest
 {
+  @Parameterized.Parameters
+  public static Collection<Object[]> testParameters()
+  {
+    return Arrays.asList(new Object[][]{{false}, {true}});
+  }
+
+  @Parameterized.Parameter
+  public Boolean useSpillable;
+
+  private WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage;
+  private WindowedStorage.WindowedPlainStorage<MutableLong> plainDataStorage;
+  private WindowedStorage.WindowedPlainStorage<Long> plainRetractionStorage;
+  private WindowedStorage.WindowedKeyedStorage<String, MutableLong> keyedDataStorage;
+  private WindowedStorage.WindowedKeyedStorage<String, Long> keyedRetractionStorage;
+  private SpillableComplexComponentImpl sccImpl;
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
   private void verifyValidationFailure(WindowedOperatorImpl windowedOperator, String message)
   {
     try {
@@ -60,19 +89,61 @@ public class WindowedOperatorTest
   private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator()
   {
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
-    windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>());
-    windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>());
-    windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+    if (useSpillable) {
+      sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+      // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys.
+      windowStateStorage = new InMemoryWindowedStorage<>();
+      SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>();
+      pds.setSpillableComplexComponent(sccImpl);
+      plainDataStorage = pds;
+      SpillableWindowedPlainStorage<Long> prs = new SpillableWindowedPlainStorage<>();
+      prs.setSpillableComplexComponent(sccImpl);
+      plainRetractionStorage = prs;
+      windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
+    } else {
+      windowStateStorage = new InMemoryWindowedStorage<>();
+      plainDataStorage = new InMemoryWindowedStorage<>();
+      plainRetractionStorage = new InMemoryWindowedStorage<>();
+    }
+    windowedOperator.setDataStorage(plainDataStorage);
+    windowedOperator.setRetractionStorage(plainRetractionStorage);
+    windowedOperator.setWindowStateStorage(windowStateStorage);
     windowedOperator.setAccumulation(new SumAccumulation());
     return windowedOperator;
   }
 
-  private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator()
+  private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator(boolean forSession)
   {
     KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
-    windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>());
-    windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>());
-    windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+    if (useSpillable) {
+      sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+      // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys.
+      windowStateStorage = new InMemoryWindowedStorage<>();
+      if (forSession) {
+        SpillableSessionWindowedStorage<String, MutableLong> sws = new SpillableSessionWindowedStorage<>();
+        sws.setSpillableComplexComponent(sccImpl);
+        keyedDataStorage = sws;
+      } else {
+        SpillableWindowedKeyedStorage<String, MutableLong> kds = new SpillableWindowedKeyedStorage<>();
+        kds.setSpillableComplexComponent(sccImpl);
+        keyedDataStorage = kds;
+      }
+      SpillableWindowedKeyedStorage<String, Long> krs = new SpillableWindowedKeyedStorage<>();
+      krs.setSpillableComplexComponent(sccImpl);
+      keyedRetractionStorage = krs;
+      windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
+    } else {
+      windowStateStorage = new InMemoryWindowedStorage<>();
+      if (forSession) {
+        keyedDataStorage = new InMemorySessionWindowedStorage<>();
+      } else {
+        keyedDataStorage = new InMemoryWindowedKeyedStorage<>();
+      }
+      keyedRetractionStorage = new InMemoryWindowedKeyedStorage<>();
+    }
+    windowedOperator.setDataStorage(keyedDataStorage);
+    windowedOperator.setRetractionStorage(keyedRetractionStorage);
+    windowedOperator.setWindowStateStorage(windowStateStorage);
     windowedOperator.setAccumulation(new SumAccumulation());
     return windowedOperator;
   }
@@ -102,8 +173,6 @@ public class WindowedOperatorTest
   @Test
   public void testWatermarkAndAllowedLateness()
   {
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     CollectorTestSink controlSink = new CollectorTestSink();
 
@@ -112,26 +181,20 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     windowedOperator.setAllowedLateness(Duration.millis(1000));
 
-    WindowedStorage.WindowedPlainStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
-    WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>();
-
-    windowedOperator.setDataStorage(dataStorage);
-    windowedOperator.setWindowStateStorage(windowStateStorage);
-
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
-    Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size());
+    Assert.assertEquals("There should be exactly one window in the storage", 1, plainDataStorage.size());
     Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size());
 
     Map.Entry<Window, WindowState> entry = windowStateStorage.entries().iterator().next();
     Window window = entry.getKey();
     WindowState windowState = entry.getValue();
     Assert.assertEquals(-1, windowState.watermarkArrivalTime);
-    Assert.assertEquals(2L, dataStorage.get(window).longValue());
+    Assert.assertEquals(2L, plainDataStorage.get(window).longValue());
 
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
-    Assert.assertEquals(5L, dataStorage.get(window).longValue());
+    Assert.assertEquals(5L, plainDataStorage.get(window).longValue());
 
     windowedOperator.processWatermark(new WatermarkImpl(1200));
     windowedOperator.endWindow();
@@ -140,15 +203,16 @@ public class WindowedOperatorTest
 
     windowedOperator.beginWindow(2);
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L));
-    Assert.assertEquals("Late but not too late", 9L, dataStorage.get(window).longValue());
+    Assert.assertEquals("Late but not too late", 9L, plainDataStorage.get(window).longValue());
     windowedOperator.processWatermark(new WatermarkImpl(3000));
     windowedOperator.endWindow();
     Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
     windowedOperator.beginWindow(3);
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped
-    Assert.assertEquals("The window should be dropped because it's too late", 0, dataStorage.size());
+    Assert.assertEquals("The window should be dropped because it's too late", 0, plainDataStorage.size());
     Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size());
     windowedOperator.endWindow();
+    windowedOperator.teardown();
   }
 
   private void testTrigger(TriggerOption.AccumulationMode accumulationMode)
@@ -172,9 +236,7 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     CollectorTestSink sink = new CollectorTestSink();
     windowedOperator.output.setSink(sink);
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
@@ -212,6 +274,7 @@ public class WindowedOperatorTest
       default:
         throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
     }
+    windowedOperator.teardown();
   }
 
   @Test
@@ -233,71 +296,77 @@ public class WindowedOperatorTest
   }
 
   @Test
+  public void testTriggerWithAccumulatingModeFiringAllPanes()
+  {
+    testTriggerWithAccumulatingModeHelper(false);
+  }
+
+  @Test
   public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
   {
-    for (boolean firingOnlyUpdatedPanes : new boolean[]{true, false}) {
-      WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
-      TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
-          .accumulatingFiredPanes();
-      if (firingOnlyUpdatedPanes) {
-        triggerOption.firingOnlyUpdatedPanes();
-      }
-      windowedOperator.setTriggerOption(triggerOption);
-      windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
-      CollectorTestSink sink = new CollectorTestSink();
-      windowedOperator.output.setSink(sink);
-      OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-          new Attribute.AttributeMap.DefaultAttributeMap());
-      windowedOperator.setup(context);
-      windowedOperator.beginWindow(1);
-      windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
-      windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
-      windowedOperator.endWindow();
-      Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
-      windowedOperator.beginWindow(2);
-      windowedOperator.endWindow();
-      Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
-      windowedOperator.beginWindow(3);
-      windowedOperator.endWindow();
+    testTriggerWithAccumulatingModeHelper(true);
+  }
+
+  public void testTriggerWithAccumulatingModeHelper(boolean firingOnlyUpdatedPanes)
+  {
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
+    TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
+        .accumulatingFiredPanes();
+    if (firingOnlyUpdatedPanes) {
+      triggerOption.firingOnlyUpdatedPanes();
+    }
+    windowedOperator.setTriggerOption(triggerOption);
+    windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
+    CollectorTestSink sink = new CollectorTestSink();
+    windowedOperator.output.setSink(sink);
+    windowedOperator.setup(testMeta.operatorContext);
+    windowedOperator.beginWindow(1);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(2);
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(3);
+    windowedOperator.endWindow();
+    Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
+    Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+    sink.collectedTuples.clear();
+    windowedOperator.beginWindow(4);
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(5);
+    windowedOperator.endWindow();
+    if (firingOnlyUpdatedPanes) {
+      Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples
+          .isEmpty());
+    } else {
       Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
       Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
-      sink.collectedTuples.clear();
-      windowedOperator.beginWindow(4);
-      windowedOperator.endWindow();
-      Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
-      windowedOperator.beginWindow(5);
-      windowedOperator.endWindow();
-      if (firingOnlyUpdatedPanes) {
-        Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples.isEmpty());
-      } else {
-        Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
-        Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
-      }
     }
+    windowedOperator.teardown();
   }
 
   @Test
   public void testGlobalWindowAssignment()
   {
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
     Collection<? extends Window> windows = windowedValue.getWindows();
     Assert.assertEquals(1, windows.size());
     Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
+    windowedOperator.teardown();
   }
 
   @Test
   public void testTimeWindowAssignment()
   {
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
     Collection<? extends Window> windows = windowedValue.getWindows();
     Assert.assertEquals(1, windows.size());
@@ -309,11 +378,9 @@ public class WindowedOperatorTest
   @Test
   public void testSlidingWindowAssignment()
   {
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
     Collection<? extends Window> windows = windowedValue.getWindows();
     Window[] winArray = windows.toArray(new Window[]{});
@@ -328,21 +395,18 @@ public class WindowedOperatorTest
     Assert.assertEquals(1000, winArray[3].getDurationMillis());
     Assert.assertEquals(1600, winArray[4].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[4].getDurationMillis());
+    windowedOperator.teardown();
   }
 
   @Test
   public void testSessionWindows()
   {
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
-    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
-    windowedOperator.setDataStorage(new InMemorySessionWindowedStorage<String, MutableLong>());
-    windowedOperator.setRetractionStorage(new InMemorySessionWindowedStorage<String, Long>());
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(true);
     windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000)));
     windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
     CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink();
     windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
     Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L));
     windowedOperator.processTuple(tuple);
@@ -400,16 +464,24 @@ public class WindowedOperatorTest
     Assert.assertEquals(3, sink.getCount(false));
 
     // retraction of the two old windows
+    Map<Window, KeyValPair<String, Long>> tuples = new TreeMap<>();
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
-    Assert.assertEquals(window2, out.getWindows().iterator().next());
-    Assert.assertEquals("a", out.getValue().getKey());
-    Assert.assertEquals(-5L, out.getValue().getValue().longValue());
+    tuples.put(out.getWindows().iterator().next(), out.getValue());
+
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
     Assert.assertEquals(1, out.getWindows().size());
-    Assert.assertEquals(window3, out.getWindows().iterator().next());
-    Assert.assertEquals("a", out.getValue().getKey());
-    Assert.assertEquals(-4L, out.getValue().getValue().longValue());
+    tuples.put(out.getWindows().iterator().next(), out.getValue());
+
+    Iterator<Map.Entry<Window, KeyValPair<String, Long>>> iterator = tuples.entrySet().iterator();
+    Map.Entry<Window, KeyValPair<String, Long>> entry = iterator.next();
+    Assert.assertEquals(window2, entry.getKey());
+    Assert.assertEquals("a", entry.getValue().getKey());
+    Assert.assertEquals(-5L, entry.getValue().getValue().longValue());
+    entry = iterator.next();
+    Assert.assertEquals(window3, entry.getKey());
+    Assert.assertEquals("a", entry.getValue().getKey());
+    Assert.assertEquals(-4L, entry.getValue().getValue().longValue());
 
     // normal trigger
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2);
@@ -421,32 +493,30 @@ public class WindowedOperatorTest
     Assert.assertEquals(12L, out.getValue().getValue().longValue());
 
     windowedOperator.endWindow();
+    windowedOperator.teardown();
   }
 
   @Test
   public void testKeyedAccumulation()
   {
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
-    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false);
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
-    WindowedStorage.WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>();
-    windowedOperator.setDataStorage(dataStorage);
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L)));
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L)));
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L)));
     windowedOperator.endWindow();
-    Assert.assertEquals(1, dataStorage.size());
-    Assert.assertEquals(5L, dataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue());
-    Assert.assertEquals(9L, dataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue());
+    Assert.assertEquals(1, keyedDataStorage.size());
+    Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue());
+    Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue());
+    windowedOperator.teardown();
   }
 
   private void testKeyedTrigger(TriggerOption.AccumulationMode accumulationMode)
   {
-    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false);
     TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
     switch (accumulationMode) {
       case ACCUMULATING:
@@ -465,9 +535,7 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink();
     windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
-        new Attribute.AttributeMap.DefaultAttributeMap());
-    windowedOperator.setup(context);
+    windowedOperator.setup(testMeta.operatorContext);
     windowedOperator.beginWindow(1);
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
     windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L)));
@@ -530,6 +598,7 @@ public class WindowedOperatorTest
       default:
         throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
     }
+    windowedOperator.teardown();
   }
 
   @Test