You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/14 18:44:59 UTC

apex-malhar git commit: APEXMALHAR-2130 WindowStorage interface changes in preparation of incorporating spillable data structures

Repository: apex-malhar
Updated Branches:
  refs/heads/master b811e3356 -> 9f9da0ee1


APEXMALHAR-2130 WindowStorage interface changes in preparation of incorporating spillable data structures


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9f9da0ee
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9f9da0ee
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9f9da0ee

Branch: refs/heads/master
Commit: 9f9da0ee15e00b51d57725c75119c145697bfd64
Parents: b811e33
Author: David Yan <da...@datatorrent.com>
Authored: Sun Sep 11 00:30:20 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Wed Sep 14 11:24:56 2016 -0700

----------------------------------------------------------------------
 .../stream/sample/complete/AutoComplete.java    |   2 +-
 .../sample/complete/TopWikipediaSessions.java   |  14 +-
 .../sample/complete/TwitterAutoComplete.java    |   2 +-
 .../sample/cookbook/MaxPerKeyExamples.java      |   2 +-
 .../apex/malhar/lib/window/Accumulation.java    |   2 +-
 .../lib/window/SessionWindowedStorage.java      |  11 +-
 .../apache/apex/malhar/lib/window/Tuple.java    |  70 +++++++++-
 .../apache/apex/malhar/lib/window/Window.java   | 125 +++++++++++------
 .../apex/malhar/lib/window/WindowOption.java    |   4 +-
 .../malhar/lib/window/WindowedKeyedStorage.java |  81 -----------
 .../malhar/lib/window/WindowedOperator.java     |   7 -
 .../apex/malhar/lib/window/WindowedStorage.java |  98 +++++++++-----
 .../window/impl/AbstractWindowedOperator.java   | 135 ++++++++++++++-----
 .../impl/InMemorySessionWindowedStorage.java    |  87 ++++++++++++
 .../impl/InMemoryWindowedKeyedStorage.java      |  42 +-----
 .../window/impl/InMemoryWindowedStorage.java    |  22 ++-
 .../window/impl/KeyedWindowedOperatorImpl.java  |  14 +-
 .../lib/window/impl/WindowedOperatorImpl.java   |   2 +-
 .../malhar/lib/window/WindowedOperatorTest.java |  44 +++---
 .../apex/malhar/stream/api/util/TupleUtil.java  |  13 +-
 20 files changed, 481 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 6b208aa..2db59b6 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -208,7 +208,7 @@ public class AutoComplete implements StreamingApplication
           public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
           {
             // TODO: Should be removed after Auto-wrapping is supported.
-            return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
           }
         });
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index 5ac3e7f..68ec733 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -28,6 +28,7 @@ import org.joda.time.Duration;
 
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.lib.window.accumulation.TopN;
 import org.apache.apex.malhar.stream.api.ApexStream;
@@ -269,7 +270,8 @@ public class TopWikipediaSessions implements StreamingApplication
           @Override
           public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
           {
-            return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp());
+            Window window = input.getWindows().iterator().next();
+            return new TempWrapper(input.getValue(), window.getBeginTimestamp());
           }
         }, name("TempWrapper"))
 
@@ -290,14 +292,15 @@ public class TopWikipediaSessions implements StreamingApplication
     @Override
     public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
     {
-      return new Tuple.WindowedTuple<KeyValPair<String, Long>>(input.getWindows().get(0), new KeyValPair<String, Long>(
-        input.getValue().getKey()  + " : " + input.getWindows().get(0).getBeginTimestamp() + " : " + input.getWindows().get(0).getDurationMillis(),
+      Window window = input.getWindows().iterator().next();
+      return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
+        input.getValue().getKey()  + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
         input.getValue().getValue()));
     }
   }
 
   /**
-   * A flapmap function that turns the result into readable format.
+   * A flatmap function that turns the result into readable format.
    */
   static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
   {
@@ -308,7 +311,8 @@ public class TopWikipediaSessions implements StreamingApplication
       for (TempWrapper item : input.getValue()) {
         String session = item.getValue().getKey();
         long count = item.getValue().getValue();
-        result.add(session + " + " + count + " : " + input.getWindows().get(0).getBeginTimestamp());
+        Window window = input.getWindows().iterator().next();
+        result.add(session + " + " + count + " : " + window.getBeginTimestamp());
       }
       return result;
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index ffd2a03..4fc80ea 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -125,7 +125,7 @@ public class TwitterAutoComplete implements StreamingApplication
           public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
           {
             // TODO: Should be removed after Auto-wrapping is supported.
-            return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
           }
         }, name("TopNByKey"));
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 4fafa5a..9fd9495 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -110,7 +110,7 @@ public class MaxPerKeyExamples implements StreamingApplication
               @Override
               public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
               {
-                return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input);
+                return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input);
               }
             }, name("MaxPerMonth"));
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
index 89215a1..03f7ff7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
@@ -51,7 +51,7 @@ public interface Accumulation<InputT, AccumT, OutputT>
   AccumT accumulate(AccumT accumulatedValue, InputT input);
 
   /**
-   * Merges two accumulated value into one
+   * Merges two accumulated values into one
    *
    * @param accumulatedValue1
    * @param accumulatedValue2

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
index 404e591..4cb2b1a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -32,9 +32,18 @@ import org.apache.hadoop.classification.InterfaceStability;
  * @since 3.5.0
  */
 @InterfaceStability.Evolving
-public interface SessionWindowedStorage<K, V> extends WindowedKeyedStorage<K, V>
+public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKeyedStorage<K, V>
 {
   /**
+   * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
+   * data to toWindow, and overwrite any existing data in toWindow
+   *
+   * @param fromWindow
+   * @param toWindow
+   */
+  void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow);
+
+  /**
    * Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap.
    * This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge
    * session windows.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
index eaf4d29..aea6bf6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
@@ -18,8 +18,9 @@
  */
 package org.apache.apex.malhar.lib.window;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -72,6 +73,27 @@ public interface Tuple<T>
     {
       return value.toString();
     }
+
+    @Override
+    public int hashCode()
+    {
+      return value.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj instanceof PlainTuple) {
+        PlainTuple<T> other = (PlainTuple<T>)obj;
+        if (this.value == null) {
+          return other.value == null;
+        } else {
+          return this.value.equals(other.value);
+        }
+      } else {
+        return false;
+      }
+    }
   }
 
   /**
@@ -103,6 +125,23 @@ public interface Tuple<T>
     {
       this.timestamp = timestamp;
     }
+
+    @Override
+    public int hashCode()
+    {
+      return super.hashCode() ^ (int)(timestamp & 0xffffff);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj instanceof TimestampedTuple && super.equals(obj)) {
+        TimestampedTuple<T> other = (TimestampedTuple<T>)obj;
+        return (this.timestamp == other.timestamp);
+      } else {
+        return false;
+      }
+    }
   }
 
   /**
@@ -112,7 +151,7 @@ public interface Tuple<T>
    */
   class WindowedTuple<T> extends TimestampedTuple<T>
   {
-    private List<Window> windows = new ArrayList<>();
+    private Set<Window> windows = new TreeSet<>();
 
     public WindowedTuple()
     {
@@ -124,7 +163,13 @@ public interface Tuple<T>
       this.windows.add(window);
     }
 
-    public List<Window> getWindows()
+    public WindowedTuple(Collection<? extends Window> windows, long timestamp, T value)
+    {
+      super(timestamp, value);
+      this.windows.addAll(windows);
+    }
+
+    public Collection<Window> getWindows()
     {
       return windows;
     }
@@ -133,6 +178,23 @@ public interface Tuple<T>
     {
       this.windows.add(window);
     }
+
+    @Override
+    public int hashCode()
+    {
+      return super.hashCode() ^ windows.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj instanceof WindowedTuple && super.equals(obj)) {
+        WindowedTuple<T> other = (WindowedTuple<T>)obj;
+        return this.windows.equals(other.windows);
+      } else {
+        return false;
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
index 32a028a..50d6445 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -18,27 +18,33 @@
  */
 package org.apache.apex.malhar.lib.window;
 
-import java.util.Comparator;
-
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * This interface describes the individual window.
+ * This interface describes the classes that represent individual windows.
+ *
+ * @param <WINDOW> window type the object of this class can call compareTo
  *
  * @since 3.5.0
  */
 @InterfaceStability.Evolving
-public interface Window
+public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WINDOW>
 {
   long getBeginTimestamp();
 
   long getDurationMillis();
 
   /**
-   * Global window means there is only one window, or no window depending on how you look at it.
+   * Global window means there is only one window that spans the entire life time of the application
    */
-  class GlobalWindow implements Window
+  class GlobalWindow implements Window<GlobalWindow>
   {
+
+    /**
+     * The singleton global window
+     */
+    public static final GlobalWindow INSTANCE = new GlobalWindow();
+
     private GlobalWindow()
     {
     }
@@ -57,56 +63,37 @@ public interface Window
     @Override
     public boolean equals(Object other)
     {
-      return (other instanceof GlobalWindow);
+      return this == other;
     }
-  }
 
-  class DefaultComparator implements Comparator<Window>
-  {
-    private DefaultComparator()
+    @Override
+    public int compareTo(GlobalWindow o)
     {
+      return 0;
     }
 
     @Override
-    public int compare(Window o1, Window o2)
+    public String toString()
     {
-      if (o1.getBeginTimestamp() < o2.getBeginTimestamp()) {
-        return -1;
-      } else if (o1.getBeginTimestamp() > o2.getBeginTimestamp()) {
-        return 1;
-      } else if (o1.getDurationMillis() < o2.getDurationMillis()) {
-        return -1;
-      } else if (o1.getDurationMillis() > o2.getDurationMillis()) {
-        return 1;
-      } else if (o1 instanceof SessionWindow && o2 instanceof SessionWindow) {
-        return Long.compare(((SessionWindow)o1).getKey().hashCode(), ((SessionWindow)o2).getKey().hashCode());
-      } else {
-        return 0;
-      }
+      return "[GlobalWindow]";
     }
   }
 
   /**
-   * The singleton global window
-   */
-  GlobalWindow GLOBAL_WINDOW = new GlobalWindow();
-
-  /**
-   * The singleton default comparator of windows
-   */
-  Comparator<Window> DEFAULT_COMPARATOR = new DefaultComparator();
-
-  /**
    * TimeWindow is a window that represents a time slice
+   *
+   * @param <WINDOW> window type the object of this class can call compareTo
    */
-  class TimeWindow implements Window
+  class TimeWindow<WINDOW extends TimeWindow<WINDOW>> implements Window<WINDOW>
   {
-    protected long beginTimestamp;
-    protected long durationMillis;
+    protected final long beginTimestamp;
+    protected final long durationMillis;
 
     private TimeWindow()
     {
       // for kryo
+      this.beginTimestamp = -1;
+      this.durationMillis = 0;
     }
 
     public TimeWindow(long beginTimestamp, long durationMillis)
@@ -148,20 +135,49 @@ public interface Window
       }
     }
 
+    @Override
+    public int hashCode()
+    {
+      int result = (int)(beginTimestamp ^ (beginTimestamp >>> 32));
+      result = 31 * result + (int)(durationMillis ^ (durationMillis >>> 32));
+      return result;
+    }
+
+    @Override
+    public int compareTo(TimeWindow o)
+    {
+      if (this.getBeginTimestamp() < o.getBeginTimestamp()) {
+        return -1;
+      } else if (this.getBeginTimestamp() > o.getBeginTimestamp()) {
+        return 1;
+      } else if (this.getDurationMillis() < o.getDurationMillis()) {
+        return -1;
+      } else if (this.getDurationMillis() > o.getDurationMillis()) {
+        return 1;
+      }
+      return 0;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "[TimeWindow " + getBeginTimestamp() + "(" + getDurationMillis() + ")]";
+    }
   }
 
   /**
    * SessionWindow is a window that represents a time slice for a key, with the time slice being variable length.
    *
-   * @param <K>
+   * @param <K> the key type for the session
    */
-  class SessionWindow<K> extends TimeWindow
+  class SessionWindow<K> extends TimeWindow<SessionWindow<K>>
   {
-    private K key;
+    private final K key;
 
     private SessionWindow()
     {
       // for kryo
+      this.key = null;
     }
 
     public SessionWindow(K key, long beginTimestamp, long duration)
@@ -192,5 +208,32 @@ public interface Window
         return false;
       }
     }
+
+    @Override
+    public int hashCode()
+    {
+      return (key.hashCode() << 16) | super.hashCode();
+    }
+
+    @Override
+    public int compareTo(SessionWindow<K> o)
+    {
+      int val = super.compareTo(o);
+      if (val == 0) {
+        if (this.getKey() instanceof Comparable) {
+          return ((Comparable<K>)this.getKey()).compareTo(o.getKey());
+        } else {
+          return Long.compare(this.getKey().hashCode(), o.getKey().hashCode());
+        }
+      } else {
+        return val;
+      }
+    }
+
+    @Override
+    public String toString()
+    {
+      return "[SessionWindow key=" + getKey() + " " + getBeginTimestamp() + "(" + getDurationMillis() + ")]";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
index de244fb..099709d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
@@ -63,7 +63,7 @@ public interface WindowOption
     /**
      * Gets the duration of the time window
      *
-     * @return
+     * @return the duration of the time window
      */
     public Duration getDuration()
     {
@@ -74,7 +74,7 @@ public interface WindowOption
      * The time window should be a sliding window with the given slide duration
      *
      * @param duration
-     * @return
+     * @return the SlidingTimeWindows
      */
     public SlidingTimeWindows slideBy(Duration duration)
     {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
deleted file mode 100644
index d59ee40..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window;
-
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This interface is for a key/value store for storing data for windowed streams.
- * The key to this key/value store is a pair of (Window, K).
- * Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures
- * in the near future.
- *
- * Note that this interface expects that the implementation takes care of checkpoint recovery.
- *
- *
- * @since 3.5.0
- */
-@InterfaceStability.Unstable
-public interface WindowedKeyedStorage<K, V> extends WindowedStorage<Map<K, V>>
-{
-  /**
-   * Sets the data associated with the given window and the key
-   *
-   * @param window
-   * @param key
-   * @param value
-   */
-  void put(Window window, K key, V value);
-
-  /**
-   * Gets the key/value pairs associated with the given window
-   *
-   * @param window
-   * @return
-   */
-  Iterable<Map.Entry<K, V>> entrySet(Window window);
-
-  /**
-   * Gets the data associated with the given window and the key
-   *
-   * @param window
-   * @param key
-   * @return
-   */
-  V get(Window window, K key);
-
-  /**
-   * Removes all the data associated with the given window
-   *
-   * @param window
-   */
-  void remove(Window window);
-
-  /**
-   * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
-   * data to toWindow, and overwrite any existing data in toWindow
-   *
-   * @param fromWindow
-   * @param toWindow
-   */
-  void migrateWindow(Window fromWindow, Window toWindow);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
index 5da531c..ccc7ae1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
@@ -61,13 +61,6 @@ public interface WindowedOperator<InputT>
   void setAllowedLateness(Duration allowedLateness);
 
   /**
-   * This methods sets the storage for the meta data for each window
-   *
-   * @param storageAgent
-   */
-  void setWindowStateStorage(WindowedStorage<WindowState> storageAgent);
-
-  /**
    * This sets the function that extracts the timestamp from the input tuple
    *
    * @param timestampExtractor

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
index c2b3f08..42ecdae 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -20,23 +20,19 @@ package org.apache.apex.malhar.lib.window;
 
 import java.util.Map;
 
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+
 /**
  * WindowedStorage is a key-value store with the key being the window. The implementation of this interface should
  * make sure checkpointing and recovery will be done correctly.
- * Note that this interface may go away soon as there are plans to incorporate {@link Spillable} data structures in the
- * near future.
- *
- * @param <T> The type of the data that is stored per window
- *
- * TODO: Look at the possibility of integrating spillable data structure: https://issues.apache.org/jira/browse/APEXMALHAR-2026
  *
  * @since 3.5.0
  */
 @InterfaceStability.Unstable
-public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
+public interface WindowedStorage extends Component<Context.OperatorContext>
 {
   /**
    * Returns true if the storage contains this window
@@ -53,22 +49,6 @@ public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
   long size();
 
   /**
-   * Sets the data associated with the given window
-   *
-   * @param window
-   * @param value
-   */
-  void put(Window window, T value);
-
-  /**
-   * Gets the value associated with the given window
-   *
-   * @param window
-   * @return
-   */
-  T get(Window window);
-
-  /**
    * Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state
    *
    * @param window
@@ -76,18 +56,72 @@ public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
   void remove(Window window);
 
   /**
-   * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
-   * data to toWindow, and overwrite any existing data in toWindow
+   * This interface handles plain value per window. If there is a key/value map for each window, use
+   * {@link WindowedKeyedStorage}. Also note that a single T object is assumed to be fit in memory
    *
-   * @param fromWindow
-   * @param toWindow
+   * Note that this interface expects that the implementation takes care of checkpoint recovery.
+   *
+   * @param <T> The type of the data that is stored per window
    */
-  void migrateWindow(Window fromWindow, Window toWindow);
+  interface WindowedPlainStorage<T> extends WindowedStorage
+  {
+    /**
+     * Sets the data associated with the given window
+     *
+     * @param window
+     * @param value
+     */
+    void put(Window window, T value);
+
+    /**
+     * Gets the value associated with the given window
+     *
+     * @param window
+     * @return
+     */
+    T get(Window window);
+
+    /**
+     * Returns the iterable of the entries in the storage
+     *
+     * @return
+     */
+    Iterable<Map.Entry<Window, T>> entries();
+  }
 
   /**
-   * Returns the iterable of the entries in the storage
+   * This interface is for a store that maps Windows to maps of key value pairs.
+   *
+   * Note that this interface expects that the implementation takes care of checkpoint recovery.
    *
-   * @return
    */
-  Iterable<Map.Entry<Window, T>> entrySet();
+  interface WindowedKeyedStorage<K, V> extends WindowedStorage
+  {
+    /**
+     * Sets the data associated with the given window and the key
+     *
+     * @param window
+     * @param key
+     * @param value
+     */
+    void put(Window window, K key, V value);
+
+    /**
+     * Gets an iterable object over the key/value pairs associated with the given window
+     *
+     * @param window
+     * @return
+     */
+    Iterable<Map.Entry<K, V>> entries(Window window);
+
+    /**
+     * Gets the data associated with the given window and the key
+     *
+     * @param window
+     * @param key
+     * @return
+     */
+    V get(Window window, K key);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 7abe9b6..f90d47d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -19,6 +19,9 @@
 package org.apache.apex.malhar.lib.window.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +32,7 @@ import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.ControlTuple;
 import org.apache.apex.malhar.lib.window.TriggerOption;
@@ -42,9 +46,11 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Function;
 
+import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 
@@ -63,13 +69,13 @@ import com.datatorrent.common.util.BaseOperator;
  */
 @InterfaceStability.Evolving
 public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation>
-    extends BaseOperator implements WindowedOperator<InputT>
+    extends BaseOperator implements WindowedOperator<InputT>, Operator.CheckpointNotificationListener
 {
 
   protected WindowOption windowOption;
   protected TriggerOption triggerOption;
   protected long allowedLatenessMillis = -1;
-  protected WindowedStorage<WindowState> windowStateMap;
+  protected WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap;
 
   private Function<InputT, Long> timestampExtractor;
 
@@ -81,12 +87,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   private long lateTriggerCount;
   private long lateTriggerMillis;
   private long currentDerivedTimestamp = -1;
-  private long windowWidthMillis;
+  private long timeIncrement;
   private long fixedWatermarkMillis = -1;
+
+  private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
+
   protected DataStorageT dataStorage;
   protected RetractionStorageT retractionStorage;
   protected AccumulationT accumulation;
 
+  private static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE);
   private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class);
 
   public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>()
@@ -156,9 +166,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   public void setWindowOption(WindowOption windowOption)
   {
     this.windowOption = windowOption;
-    if (this.windowOption instanceof WindowOption.GlobalWindow) {
-      windowStateMap.put(Window.GLOBAL_WINDOW, new WindowState());
-    }
   }
 
   @Override
@@ -216,6 +223,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     this.retractionStorage = storageAgent;
   }
 
+  public void addComponent(String key, Component<Context.OperatorContext> component)
+  {
+    components.put(key, component);
+  }
+
   /**
    * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what
    * to put in the pane when a trigger is fired
@@ -227,8 +239,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     this.accumulation = accumulation;
   }
 
-  @Override
-  public void setWindowStateStorage(WindowedStorage<WindowState> storageAgent)
+  public void setWindowStateStorage(WindowedStorage.WindowedPlainStorage<WindowState> storageAgent)
   {
     this.windowStateMap = storageAgent;
   }
@@ -284,12 +295,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     if (windowOption == null && input instanceof Tuple.WindowedTuple) {
       // inherit the windows from upstream
       return (Tuple.WindowedTuple<InputT>)input;
+    } else {
+      return new Tuple.WindowedTuple<>(assignWindows(input), extractTimestamp(input), input.getValue());
     }
-    Tuple.WindowedTuple<InputT> windowedTuple = new Tuple.WindowedTuple<>();
-    windowedTuple.setValue(input.getValue());
-    windowedTuple.setTimestamp(extractTimestamp(input));
-    assignWindows(windowedTuple.getWindows(), input);
-    return windowedTuple;
   }
 
   private long extractTimestamp(Tuple<InputT> tuple)
@@ -305,29 +313,31 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     }
   }
 
-  private void assignWindows(List<Window> windows, Tuple<InputT> inputTuple)
+  private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple)
   {
     if (windowOption instanceof WindowOption.GlobalWindow) {
-      windows.add(Window.GLOBAL_WINDOW);
+      return GLOBAL_WINDOW_SINGLETON_SET;
     } else {
       long timestamp = extractTimestamp(inputTuple);
       if (windowOption instanceof WindowOption.TimeWindows) {
-
-        for (Window.TimeWindow window : getTimeWindowsForTimestamp(timestamp)) {
+        Collection<? extends Window> windows = getTimeWindowsForTimestamp(timestamp);
+        for (Window window : windows) {
           if (!windowStateMap.containsWindow(window)) {
             windowStateMap.put(window, new WindowState());
           }
-          windows.add(window);
         }
+        return windows;
       } else if (windowOption instanceof WindowOption.SessionWindows) {
-        assignSessionWindows(windows, timestamp, inputTuple);
+        return assignSessionWindows(timestamp, inputTuple);
+      } else {
+        throw new IllegalStateException("Unsupported Window Option: " + windowOption.getClass());
       }
     }
   }
 
-  protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<InputT> inputTuple)
+  protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<InputT> inputTuple)
   {
-    throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException("Session window require keyed tuples");
   }
 
   /**
@@ -338,7 +348,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
    * @param timestamp
    * @return
    */
-  private List<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
+  private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
   {
     List<Window.TimeWindow> windows = new ArrayList<>();
     if (windowOption instanceof WindowOption.TimeWindows) {
@@ -348,8 +358,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
       if (windowOption instanceof WindowOption.SlidingTimeWindows) {
         long slideBy = ((WindowOption.SlidingTimeWindows)windowOption).getSlideByDuration().getMillis();
         // add the sliding windows front and back
-        // Note: this messes up the order of the window and we might want to revisit this if the order of the windows
-        // matter
         for (long slideBeginTimestamp = beginTimestamp - slideBy;
             slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis;
             slideBeginTimestamp -= slideBy) {
@@ -389,8 +397,32 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   @Override
   public void setup(Context.OperatorContext context)
   {
-    this.windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+    this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
     validate();
+    windowStateMap.setup(context);
+    dataStorage.setup(context);
+    if (retractionStorage != null) {
+      retractionStorage.setup(context);
+    }
+    for (Component component : components.values()) {
+      component.setup(context);
+    }
+    if (this.windowOption instanceof WindowOption.GlobalWindow) {
+      windowStateMap.put(Window.GlobalWindow.INSTANCE, new WindowState());
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowStateMap.teardown();
+    dataStorage.teardown();
+    if (retractionStorage != null) {
+      retractionStorage.teardown();
+    }
+    for (Component component : components.values()) {
+      component.teardown();
+    }
   }
 
   /**
@@ -399,10 +431,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   @Override
   public void beginWindow(long windowId)
   {
+    for (Component component : components.values()) {
+      if (component instanceof WindowListener) {
+        ((WindowListener)component).beginWindow(windowId);
+      }
+    }
     if (currentDerivedTimestamp == -1) {
-      currentDerivedTimestamp = ((windowId >> 32) * 1000) + (windowId & 0xffffffffL);
+      // TODO: once we are able to get the firstWindowMillis from Apex Core API, we should use that instead
+      currentDerivedTimestamp = (windowId >> 32) * 1000;
     } else {
-      currentDerivedTimestamp += windowWidthMillis;
+      currentDerivedTimestamp += timeIncrement;
     }
     watermarkTimestamp = -1;
   }
@@ -417,6 +455,12 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     // TODO: May want to revisit this if the application cares more about latency than idempotency
     processWatermarkAtEndWindow();
     fireTimeTriggers();
+
+    for (Component component : components.values()) {
+      if (component instanceof WindowListener) {
+        ((WindowListener)component).endWindow();
+      }
+    }
   }
 
   private void processWatermarkAtEndWindow()
@@ -429,7 +473,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
 
       long horizon = watermarkTimestamp - allowedLatenessMillis;
 
-      for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.iterator(); it.hasNext(); ) {
+      for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) {
         Map.Entry<Window, WindowState> entry = it.next();
         Window window = entry.getKey();
         WindowState windowState = entry.getValue();
@@ -446,7 +490,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
             // discard this window because it's too late now
             it.remove();
             dataStorage.remove(window);
-            retractionStorage.remove(window);
+            if (retractionStorage != null) {
+              retractionStorage.remove(window);
+            }
           }
         }
       }
@@ -457,7 +503,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   private void fireTimeTriggers()
   {
     if (earlyTriggerMillis > 0 || lateTriggerMillis > 0) {
-      for (Map.Entry<Window, WindowState> entry : windowStateMap.entrySet()) {
+      for (Map.Entry<Window, WindowState> entry : windowStateMap.entries()) {
         Window window = entry.getKey();
         WindowState windowState = entry.getValue();
         if (windowState.watermarkArrivalTime == -1) {
@@ -511,4 +557,33 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     dataStorage.remove(window);
   }
 
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    for (Component component : components.values()) {
+      if (component instanceof CheckpointNotificationListener) {
+        ((CheckpointNotificationListener)component).beforeCheckpoint(windowId);
+      }
+    }
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+    for (Component component : components.values()) {
+      if (component instanceof CheckpointNotificationListener) {
+        ((CheckpointNotificationListener)component).checkpointed(windowId);
+      }
+    }
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    for (Component component : components.values()) {
+      if (component instanceof CheckpointNotificationListener) {
+        ((CheckpointNotificationListener)component).committed(windowId);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
new file mode 100644
index 0000000..fdceb4d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that
+ * can't be fit in memory.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Unstable
+public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedStorage<K, V>
+    implements SessionWindowedStorage<K, V>
+{
+  private Map<K, TreeSet<Window.SessionWindow<K>>> keyToWindows = new HashMap<>();
+
+  @Override
+  public void put(Window window, K key, V value)
+  {
+    super.put(window, key, value);
+    TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
+    if (sessionWindows == null) {
+      sessionWindows = new TreeSet<>();
+      keyToWindows.put(key, sessionWindows);
+    }
+    sessionWindows.add((Window.SessionWindow<K>)window);
+  }
+
+  @Override
+  public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow)
+  {
+    if (containsWindow(fromWindow)) {
+      map.put(toWindow, map.remove(fromWindow));
+    }
+  }
+
+  @Override
+  public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
+  {
+    List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
+    TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
+    if (sessionWindows != null) {
+      Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1);
+      Window.SessionWindow<K> floor = sessionWindows.floor(refWindow);
+      if (floor != null) {
+        if (floor.getBeginTimestamp() + floor.getDurationMillis() + gap > timestamp) {
+          results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key)));
+        }
+      }
+      Window.SessionWindow<K> higher = sessionWindows.higher(refWindow);
+      if (higher != null) {
+        if (higher.getBeginTimestamp() - gap <= timestamp) {
+          results.add(new AbstractMap.SimpleEntry<>(higher, map.get(higher).get(key)));
+        }
+      }
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
index 7dbfbb1..4b47edc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
@@ -18,31 +18,23 @@
  */
 package org.apache.apex.malhar.lib.window.impl;
 
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
 import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that
- * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures
- * in the near future.
+ * can't be fit in memory.
  *
  * @since 3.5.0
  */
 @InterfaceStability.Unstable
 public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<Map<K, V>>
-    implements WindowedKeyedStorage<K, V>, SessionWindowedStorage<K, V>
+    implements WindowedStorage.WindowedKeyedStorage<K, V>
 {
   @Override
   public void put(Window window, K key, V value)
@@ -51,14 +43,13 @@ public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<
     if (map.containsKey(window)) {
       kvMap = map.get(window);
     } else {
-      kvMap = new HashMap<K, V>();
+      kvMap = new HashMap<>();
       map.put(window, kvMap);
     }
     kvMap.put(key, value);
   }
 
-  @Override
-  public Set<Map.Entry<K, V>> entrySet(Window window)
+  public Iterable<Map.Entry<K, V>> entries(Window window)
   {
     if (map.containsKey(window)) {
       return map.get(window).entrySet();
@@ -77,27 +68,4 @@ public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<
     }
   }
 
-  @Override
-  public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
-  {
-    List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
-    // TODO: this is inefficient, but this is usually not used in a real use case since it's in memory
-    for (Map.Entry<Window, Map<K, V>> entry : map.entrySet()) {
-      Window.SessionWindow<K> window = (Window.SessionWindow<K>)entry.getKey();
-      if (key.equals(window.getKey())) {
-        if (timestamp > window.getBeginTimestamp()) {
-          if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) {
-            results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
-          }
-        } else if (timestamp < window.getBeginTimestamp()) {
-          if (window.getBeginTimestamp() - gap <= timestamp) {
-            results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
-          }
-        } else {
-          results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
-        }
-      }
-    }
-    return results;
-  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
index f6de894..db18a40 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
@@ -18,7 +18,6 @@
  */
 package org.apache.apex.malhar.lib.window.impl;
 
-import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -27,17 +26,19 @@ import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.datatorrent.api.Context;
+
 /**
- * This is the in-memory implementation of {@link WindowedStorage}. Do not use this class if you have a large state that
+ * This is the in-memory implementation of {@link WindowedPlainStorage}. Do not use this class if you have a large state that
  * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data
  * structures in the near future.
  *
  * @since 3.5.0
  */
 @InterfaceStability.Unstable
-public class InMemoryWindowedStorage<T> implements WindowedStorage<T>
+public class InMemoryWindowedStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
 {
-  protected final TreeMap<Window, T> map = new TreeMap<>(Window.DEFAULT_COMPARATOR);
+  protected final TreeMap<Window, T> map = new TreeMap<>();
 
   @Override
   public long size()
@@ -70,22 +71,19 @@ public class InMemoryWindowedStorage<T> implements WindowedStorage<T>
   }
 
   @Override
-  public void migrateWindow(Window fromWindow, Window toWindow)
+  public Iterable<Map.Entry<Window, T>> entries()
   {
-    if (containsWindow(fromWindow)) {
-      map.put(toWindow, map.remove(fromWindow));
-    }
+    return map.entrySet();
   }
 
   @Override
-  public Iterable<Map.Entry<Window, T>> entrySet()
+  public void setup(Context.OperatorContext context)
   {
-    return map.entrySet();
   }
 
   @Override
-  public Iterator<Map.Entry<Window, T>> iterator()
+  public void teardown()
   {
-    return map.entrySet().iterator();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index 7077c96..a38207a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -19,8 +19,8 @@
 package org.apache.apex.malhar.lib.window.impl;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
@@ -30,7 +30,7 @@ import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.lib.window.WindowState;
-import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.lib.util.KeyValPair;
@@ -48,11 +48,11 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 @InterfaceStability.Evolving
 public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
-    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedKeyedStorage<KeyT, AccumT>, WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
+    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
 {
 
   @Override
-  protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
+  protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
   {
     KeyT key = inputTuple.getValue().getKey();
     WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
@@ -126,7 +126,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
       default:
         throw new IllegalStateException("There are more than two sessions matching one timestamp");
     }
-    windows.add(sessionWindowToAssign);
+    return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign);
   }
 
   @Override
@@ -147,7 +147,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
   @Override
   public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
   {
-    for (Map.Entry<KeyT, AccumT> entry : dataStorage.entrySet(window)) {
+    for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
       OutputValT outputVal = accumulation.getOutput(entry.getValue());
       if (fireOnlyUpdatedPanes) {
         OutputValT oldValue = retractionStorage.get(window, entry.getKey());
@@ -168,7 +168,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
     if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
       throw new UnsupportedOperationException();
     }
-    for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entrySet(window)) {
+    for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) {
       output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index d195004..7275d88 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  */
 @InterfaceStability.Evolving
 public class WindowedOperatorImpl<InputT, AccumT, OutputT>
-    extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage<AccumT>, WindowedStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
+    extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
 {
   @Override
   public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index f8f9d8a..7396994 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -18,9 +18,8 @@
  */
 package org.apache.apex.malhar.lib.window;
 
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import javax.validation.ValidationException;
@@ -29,6 +28,7 @@ import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
@@ -112,8 +112,8 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     windowedOperator.setAllowedLateness(Duration.millis(1000));
 
-    WindowedStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
-    WindowedStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>();
+    WindowedStorage.WindowedPlainStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
+    WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>();
 
     windowedOperator.setDataStorage(dataStorage);
     windowedOperator.setWindowStateStorage(windowStateStorage);
@@ -124,7 +124,7 @@ public class WindowedOperatorTest
     Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size());
     Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size());
 
-    Map.Entry<Window, WindowState> entry = windowStateStorage.entrySet().iterator().next();
+    Map.Entry<Window, WindowState> entry = windowStateStorage.entries().iterator().next();
     Window window = entry.getKey();
     WindowState windowState = entry.getValue();
     Assert.assertEquals(-1, windowState.watermarkArrivalTime);
@@ -135,7 +135,7 @@ public class WindowedOperatorTest
 
     windowedOperator.processWatermark(new WatermarkImpl(1200));
     windowedOperator.endWindow();
-    Assert.assertTrue(windowState.watermarkArrivalTime > 0);
+    Assert.assertTrue(windowState.watermarkArrivalTime >= 0);
     Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
 
     windowedOperator.beginWindow(2);
@@ -285,9 +285,9 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
     windowedOperator.setup(context);
     Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
-    List<Window> windows = windowedValue.getWindows();
+    Collection<? extends Window> windows = windowedValue.getWindows();
     Assert.assertEquals(1, windows.size());
-    Assert.assertEquals(Window.GLOBAL_WINDOW, windows.get(0));
+    Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
   }
 
   @Test
@@ -299,10 +299,11 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
     windowedOperator.setup(context);
     Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
-    List<Window> windows = windowedValue.getWindows();
+    Collection<? extends Window> windows = windowedValue.getWindows();
     Assert.assertEquals(1, windows.size());
-    Assert.assertEquals(1000, windows.get(0).getBeginTimestamp());
-    Assert.assertEquals(1000, windows.get(0).getDurationMillis());
+    Window window = windows.iterator().next();
+    Assert.assertEquals(1000, window.getBeginTimestamp());
+    Assert.assertEquals(1000, window.getDurationMillis());
   }
 
   @Test
@@ -314,9 +315,8 @@ public class WindowedOperatorTest
     windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
     windowedOperator.setup(context);
     Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
-    List<Window> windows = windowedValue.getWindows();
+    Collection<? extends Window> windows = windowedValue.getWindows();
     Window[] winArray = windows.toArray(new Window[]{});
-    Arrays.sort(winArray, Window.DEFAULT_COMPARATOR);
     Assert.assertEquals(5, winArray.length);
     Assert.assertEquals(800, winArray[0].getBeginTimestamp());
     Assert.assertEquals(1000, winArray[0].getDurationMillis());
@@ -336,6 +336,8 @@ public class WindowedOperatorTest
     OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
         new Attribute.AttributeMap.DefaultAttributeMap());
     KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
+    windowedOperator.setDataStorage(new InMemorySessionWindowedStorage<String, MutableLong>());
+    windowedOperator.setRetractionStorage(new InMemorySessionWindowedStorage<String, Long>());
     windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000)));
     windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
     CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink();
@@ -348,7 +350,7 @@ public class WindowedOperatorTest
     Assert.assertEquals(1, sink.getCount(false));
     Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
-    Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().get(0);
+    Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
     Assert.assertEquals(1100L, window1.getBeginTimestamp());
     Assert.assertEquals(1, window1.getDurationMillis());
     Assert.assertEquals("a", window1.getKey());
@@ -364,13 +366,13 @@ public class WindowedOperatorTest
     // retraction trigger
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
-    Assert.assertEquals(window1, out.getWindows().get(0));
+    Assert.assertEquals(window1, out.getWindows().iterator().next());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(-2L, out.getValue().getValue().longValue());
 
     // normal trigger
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
-    Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().get(0);
+    Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
 
     Assert.assertEquals(1100L, window2.getBeginTimestamp());
     Assert.assertEquals(901, window2.getDurationMillis());
@@ -384,7 +386,7 @@ public class WindowedOperatorTest
     Assert.assertEquals(1, sink.getCount(false));
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
-    Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().get(0);
+    Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
     Assert.assertEquals(5000L, window3.getBeginTimestamp());
     Assert.assertEquals(1, window3.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
@@ -400,19 +402,19 @@ public class WindowedOperatorTest
     // retraction of the two old windows
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
     Assert.assertEquals(1, out.getWindows().size());
-    Assert.assertEquals(window2, out.getWindows().get(0));
+    Assert.assertEquals(window2, out.getWindows().iterator().next());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(-5L, out.getValue().getValue().longValue());
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
     Assert.assertEquals(1, out.getWindows().size());
-    Assert.assertEquals(window3, out.getWindows().get(0));
+    Assert.assertEquals(window3, out.getWindows().iterator().next());
     Assert.assertEquals("a", out.getValue().getKey());
     Assert.assertEquals(-4L, out.getValue().getValue().longValue());
 
     // normal trigger
     out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2);
     Assert.assertEquals(1, out.getWindows().size());
-    Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().get(0);
+    Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
     Assert.assertEquals(1100L, window4.getBeginTimestamp());
     Assert.assertEquals(3901, window4.getDurationMillis());
     Assert.assertEquals("a", out.getValue().getKey());
@@ -428,7 +430,7 @@ public class WindowedOperatorTest
         new Attribute.AttributeMap.DefaultAttributeMap());
     KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
     windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
-    WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>();
+    WindowedStorage.WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>();
     windowedOperator.setDataStorage(dataStorage);
     windowedOperator.setup(context);
     windowedOperator.beginWindow(1);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
index 04f42b3..f9a4ed8 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
@@ -18,10 +18,7 @@
  */
 package org.apache.apex.malhar.stream.api.util;
 
-import java.util.List;
-
 import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
 
 /**
  * The tuple util will be used to extract fields that are used as key or value<br>
@@ -38,14 +35,8 @@ public class TupleUtil
   {
 
     if (t instanceof Tuple.WindowedTuple) {
-      Tuple.WindowedTuple<O> newT = new Tuple.WindowedTuple<>();
-      List<Window> wins = ((Tuple.WindowedTuple)t).getWindows();
-      for (Window w : wins) {
-        newT.addWindow(w);
-      }
-      newT.setValue(newValue);
-      ((Tuple.WindowedTuple)t).setTimestamp(((Tuple.WindowedTuple)t).getTimestamp());
-      return newT;
+      Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t;
+      return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue);
     } else if (t instanceof Tuple.TimestampedTuple) {
       return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue);
     } else {