You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/23 06:51:51 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2345 Fix MovingBoundaryTimeBucketAssigner initialization and purge trigger.

Repository: apex-malhar
Updated Branches:
  refs/heads/master e22ea0de1 -> 1ae14c03a


APEXMALHAR-2345 Fix MovingBoundaryTimeBucketAssigner initialization and purge trigger.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1ae14c03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1ae14c03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1ae14c03

Branch: refs/heads/master
Commit: 1ae14c03a73e2814b57ce06680db57da80f0b42b
Parents: 9043f9d
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jan 22 13:19:44 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Sun Jan 22 21:45:28 2017 -0800

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |  3 +++
 .../managed/IncrementalCheckpointManager.java   |  1 +
 .../MovingBoundaryTimeBucketAssigner.java       |  5 ++--
 .../lib/state/managed/ManagedStateImplTest.java |  4 ++--
 .../MovingBoundaryTimeBucketAssignerTest.java   | 25 ++++++++++++++++++++
 5 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index daae2d8..f676b84 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -220,6 +220,8 @@ public abstract class AbstractManagedStateImpl
               buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), bucketEntry.getValue());
             }
           }
+          // Skip write to WAL during recovery during replay from WAL.
+          // Data only needs to be transferred to bucket data files.
           checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/);
         }
       } catch (IOException e) {
@@ -369,6 +371,7 @@ public abstract class AbstractManagedStateImpl
     }
     if (!flashData.isEmpty()) {
       try {
+        // write incremental state to WAL (skipWrite=false) before the checkpoint
         checkpointManager.save(flashData, windowId, false);
       } catch (IOException e) {
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 65c1d1e..aa7cec7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -111,6 +111,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
           if (latestExpiredTimeBucket.get() > -1) {
             try {
               latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1);
+              //LOG.debug("latestPurgedTimeBucket {}", latestPurgedTimeBucket);
               managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket);
             } catch (IOException e) {
               throwable.set(e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
index ece7686..cc8ea0a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -67,7 +67,7 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
   private int numBuckets;
   private transient long fixedStart;
   private transient boolean triggerPurge;
-  private transient long lowestPurgeableTimeBucket;
+  private transient long lowestPurgeableTimeBucket = -1;
 
 
   @Override
@@ -125,8 +125,9 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
       long move = (diffInBuckets + 1) * bucketSpanMillis;
       start += move;
       end += move;
-      triggerPurge = true;
       lowestPurgeableTimeBucket += diffInBuckets;
+      // trigger purge when lower bound changes
+      triggerPurge = (diffInBuckets > 0);
     }
     return key;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
index 99e6c23..dab3925 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -177,7 +177,7 @@ public class ManagedStateImplTest
     testMeta.managedState.setup(testMeta.operatorContext);
 
     int numKeys = 300;
-    long lastWindowId = (long)numKeys;
+    long lastWindowId = numKeys;
 
     for (long windowId = 0L; windowId < lastWindowId; windowId++) {
       testMeta.managedState.beginWindow(windowId);
@@ -197,7 +197,7 @@ public class ManagedStateImplTest
     for (int key = numKeys - 1; key > 0; key--) {
       Slice keyVal = ManagedStateTestUtils.getSliceFor(Integer.toString(key));
       Slice val = testMeta.managedState.getSync(0L, keyVal);
-      Assert.assertNotNull(val);
+      Assert.assertNotNull("null value for key " + key, val);
     }
 
     testMeta.managedState.endWindow();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
index e4e5d2e..2b132f4 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -28,6 +28,8 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.lang3.mutable.MutableLong;
+
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class MovingBoundaryTimeBucketAssignerTest
@@ -96,20 +98,43 @@ public class MovingBoundaryTimeBucketAssignerTest
   @Test
   public void testTimeBucketKeyExpiry()
   {
+    final MutableLong purgeLessThanEqualTo = new MutableLong(-2);
     testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
     testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+    testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
+    {
+      @Override
+      public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+      {
+        purgeLessThanEqualTo.setValue(timeBucket);
+      }
+    });
 
     long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
     testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+    Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue());
+
+    long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
+    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) );
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
 
     long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
     Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", 7, purgeLessThanEqualTo.longValue());
+    purgeLessThanEqualTo.setValue(-2);
 
     long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
     Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
+    testMeta.timeBucketAssigner.endWindow();
+// TODO: why is purgeLessThanEqualTo not moving to 8 here?
+    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
 
     //Check for expiry of time1 now
     Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
 
     testMeta.timeBucketAssigner.teardown();
   }


[2/2] apex-malhar git commit: APEXMALHAR-2345 purging time buckets in WindowedStorage

Posted by th...@apache.org.
APEXMALHAR-2345 purging time buckets in WindowedStorage


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9043f9d4
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9043f9d4
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9043f9d4

Branch: refs/heads/master
Commit: 9043f9d4ea5514cbc7cdf7ad826eb19c73a9ef9e
Parents: e22ea0d
Author: David Yan <da...@datatorrent.com>
Authored: Thu Nov 17 12:58:34 2016 -0800
Committer: David Yan <da...@apache.org>
Committed: Sun Jan 22 21:45:28 2017 -0800

----------------------------------------------------------------------
 .../AbstractWindowedOperatorBenchmarkApp.java   |  2 ++
 benchmark/src/test/resources/log4j.properties   |  2 +-
 .../managed/IncrementalCheckpointManager.java   | 11 ++++++----
 .../managed/ManagedTimeUnifiedStateImpl.java    |  2 +-
 .../malhar/lib/state/managed/StateTracker.java  |  1 -
 .../spillable/SpillableComplexComponent.java    |  2 ++
 .../SpillableComplexComponentImpl.java          |  5 +++++
 .../lib/state/spillable/SpillableSetImpl.java   |  2 +-
 .../apache/apex/malhar/lib/window/Window.java   | 14 +++++--------
 .../apex/malhar/lib/window/WindowedStorage.java |  5 +++++
 .../window/impl/AbstractWindowedOperator.java   | 16 ++++++++++++++-
 .../window/impl/InMemoryWindowedStorage.java    | 13 ++++++++++++
 .../impl/SpillableWindowedKeyedStorage.java     | 21 ++++++++++++++++++++
 .../impl/SpillableWindowedPlainStorage.java     | 18 +++++++++++++++++
 14 files changed, 96 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 7250e74..4b9b423 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -25,6 +25,7 @@ import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.state.managed.UnboundedTimeBucketAssigner;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
 import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
 import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore;
@@ -141,6 +142,7 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
   {
     String basePath = getStoreBasePath(conf);
     ManagedTimeUnifiedStateSpillableStateStore store = new ManagedTimeUnifiedStateSpillableStateStore();
+    store.setTimeBucketAssigner(new UnboundedTimeBucketAssigner());
     store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000));
     ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index 92e48b7..2f3bc53 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -41,5 +41,5 @@ log4j.logger.org=info
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=debug
 log4j.logger.org.apache.apex=debug
-log4j.logger.org.apache.apex.malhar.lib.state.managed=info
+log4j.logger.org.apache.apex.malhar.lib.state.managed=debug
 log4j.logger.com.datatorrent.common.util.FSStorageAgent=info

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 3b01ed2..65c1d1e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -72,6 +72,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
   protected transient ManagedStateContext managedStateContext;
 
   private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1);
+  private long latestPurgedTimeBucket = -1;
 
   private transient int waitMillis;
   private volatile long lastTransferredWindow = Stateless.WINDOW_ID;
@@ -109,8 +110,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
           transferWindowFiles();
           if (latestExpiredTimeBucket.get() > -1) {
             try {
-              managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(
-                  latestExpiredTimeBucket.getAndSet(-1));
+              latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1);
+              managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket);
             } catch (IOException e) {
               throwable.set(e);
               LOG.debug("delete files", e);
@@ -133,8 +134,10 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
           Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId);
 
           for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
-            managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(),
-                singleBucket.getValue());
+            long bucketId = singleBucket.getKey();
+            if (bucketId > latestPurgedTimeBucket) {
+              managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue());
+            }
           }
           committed(windowId);
         } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index d558eee..62ebbc5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -142,7 +142,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   @Override
   public void setup(Context.OperatorContext context)
   {
-    // set UnBoundedTimeBucketAssigner to this managed state impl
+    // set UnboundedTimeBucketAssigner to this managed state impl
     if (timeBucketAssigner == null) {
       UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner();
       setTimeBucketAssigner(unboundedTimeBucketAssigner);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 7cab41c..e17f5b9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -115,7 +115,6 @@ class StateTracker extends TimerTask
                 long sizeFreed;
                 try {
                   sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
-                  LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
                 } catch (IOException e) {
                   managedStateImpl.throwable.set(e);
                   throw new RuntimeException("freeing " + bucketId, e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index b6ec6a2..c00589a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -196,4 +196,6 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableQueue}.
    */
   <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde);
+
+  SpillableStateStore getStore();
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index 1d9fbc6..1ecf86e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -261,4 +261,9 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   {
     store.committed(l);
   }
+
+  public SpillableStateStore getStore()
+  {
+    return store;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index 221cd38..33e48ac 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -131,7 +131,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
   public SpillableSetImpl(@NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
       @NotNull Serde<T> serde,
-      @NotNull TimeExtractor timeExtractor)
+      @NotNull TimeExtractor<T> timeExtractor)
   {
     map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, serde, new ListNodeSerde<>(serde), timeExtractor);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
index 1d7681d..edcfc2c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -146,16 +146,12 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
     @Override
     public int compareTo(TimeWindow o)
     {
-      if (this.getBeginTimestamp() < o.getBeginTimestamp()) {
-        return -1;
-      } else if (this.getBeginTimestamp() > o.getBeginTimestamp()) {
-        return 1;
-      } else if (this.getDurationMillis() < o.getDurationMillis()) {
-        return -1;
-      } else if (this.getDurationMillis() > o.getDurationMillis()) {
-        return 1;
+      long diff = (this.getBeginTimestamp() + this.getDurationMillis()) - (o.getBeginTimestamp() + o.getDurationMillis());
+      if (diff == 0) {
+        return Long.signum(this.getBeginTimestamp() - o.getBeginTimestamp());
+      } else {
+        return Long.signum(diff);
       }
-      return 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
index e2874ba..55f8a02 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -56,6 +56,11 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
   void remove(Window window);
 
   /**
+   * Purge checkpointed data for all the windows that lie totally beyond the given horizon
+   */
+  void purge(long horizonMillis);
+
+  /**
    * This interface handles plain value per window. If there is a key/value map for each window, use
    * {@link WindowedKeyedStorage}. Also note that a single T object is assumed to be fit in memory
    *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 4ba81b3..22e8525 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import javax.validation.ValidationException;
 
@@ -89,6 +90,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   private long currentDerivedTimestamp = -1;
   private long timeIncrement;
   protected long fixedWatermarkMillis = -1;
+  private transient long streamingWindowId;
+  private transient TreeMap<Long, Long> streamingWindowToLatenessHorizon = new TreeMap<>();
 
   private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
 
@@ -407,7 +410,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   public void dropTuple(Tuple input)
   {
     // do nothing
-    LOG.debug("Dropping late tuple {}", input);
   }
 
   @Override
@@ -465,6 +467,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     } else {
       currentDerivedTimestamp += timeIncrement;
     }
+    streamingWindowId = windowId;
   }
 
   /**
@@ -517,6 +520,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
           }
         }
       }
+      streamingWindowToLatenessHorizon.put(streamingWindowId, horizon);
       controlOutput.emit(new WatermarkImpl(nextWatermark));
       this.currentWatermark = nextWatermark;
     }
@@ -623,5 +627,15 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
         ((CheckpointNotificationListener)component).committed(windowId);
       }
     }
+    Long floorWindowId = streamingWindowToLatenessHorizon.floorKey(windowId);
+    if (floorWindowId != null) {
+      long horizon = streamingWindowToLatenessHorizon.get(floorWindowId);
+      windowStateMap.purge(horizon);
+      dataStorage.purge(horizon);
+      if (retractionStorage != null) {
+        retractionStorage.purge(horizon);
+      }
+      streamingWindowToLatenessHorizon.headMap(windowId, true).clear();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
index db18a40..730bcf9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.malhar.lib.window.impl;
 
+import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -86,4 +87,16 @@ public class InMemoryWindowedStorage<T> implements WindowedStorage.WindowedPlain
   {
   }
 
+  @Override
+  public void purge(long horizonMillis)
+  {
+    for (Iterator<Map.Entry<Window, T>> iterator = map.entrySet().iterator(); iterator.hasNext(); ) {
+      Window window = iterator.next().getKey();
+      if (window.getBeginTimestamp() + window.getDurationMillis() < horizonMillis) {
+        iterator.remove();
+      } else {
+        break;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index bf0c804..6138f97 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -25,8 +25,13 @@ import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.apex.malhar.lib.window.Window;
@@ -57,6 +62,9 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
   protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
   protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
 
+  private static final Logger LOG = LoggerFactory.getLogger(SpillableWindowedKeyedStorage.class);
+
+
   private class KVIterator implements Iterator<Map.Entry<K, V>>
   {
     final Window window;
@@ -221,4 +229,17 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
   {
     return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
   }
+
+  @Override
+  public void purge(long horizonMillis)
+  {
+    SpillableStateStore store = scc.getStore();
+    if (store instanceof ManagedTimeUnifiedStateImpl) {
+      ManagedTimeUnifiedStateImpl timeState = (ManagedTimeUnifiedStateImpl)store;
+      long purgeTimeBucket = horizonMillis - timeState.getTimeBucketAssigner().getBucketSpan().getMillis();
+      LOG.debug("Purging state less than equal to {}", purgeTimeBucket);
+      timeState.purgeTimeBucketsLessThanEqualTo(purgeTimeBucket);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index f70771c..a58d89f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -22,8 +22,13 @@ import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.apex.malhar.lib.window.Window;
@@ -45,6 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
   private long bucket;
   private Serde<Window> windowSerde;
   private Serde<T> valueSerde;
+  private static final Logger LOG = LoggerFactory.getLogger(SpillableWindowedPlainStorage.class);
 
   protected Spillable.SpillableMap<Window, T> windowToDataMap;
 
@@ -143,4 +149,16 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
   public void teardown()
   {
   }
+
+  @Override
+  public void purge(long horizonMillis)
+  {
+    SpillableStateStore store = scc.getStore();
+    if (store instanceof ManagedTimeUnifiedStateImpl) {
+      ManagedTimeUnifiedStateImpl timeState = (ManagedTimeUnifiedStateImpl)store;
+      long purgeTimeBucket = horizonMillis - timeState.getTimeBucketAssigner().getBucketSpan().getMillis();
+      LOG.debug("Purging state less than equal to {}", purgeTimeBucket);
+      timeState.purgeTimeBucketsLessThanEqualTo(purgeTimeBucket);
+    }
+  }
 }