You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/19 14:14:20 UTC

apex-malhar git commit: APEXMALHAR-2406 APEXMALHAR-2407 APEXMALHAR-2408 ManagedState Issues

Repository: apex-malhar
Updated Branches:
  refs/heads/master d5c24dc8e -> a26a9f8b2


APEXMALHAR-2406 APEXMALHAR-2407 APEXMALHAR-2408 ManagedState Issues


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a26a9f8b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a26a9f8b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a26a9f8b

Branch: refs/heads/master
Commit: a26a9f8b2940986db313957fd3c9969489ab47ee
Parents: d5c24dc
Author: chaitanya <ch...@apache.org>
Authored: Sun Mar 19 17:40:33 2017 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Sun Mar 19 17:40:55 2017 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/state/managed/Bucket.java   | 11 +++--
 .../lib/state/managed/BucketsFileSystem.java    | 13 +++---
 .../managed/IncrementalCheckpointManager.java   |  5 +--
 .../MovingBoundaryTimeBucketAssigner.java       | 25 ++++++++++--
 .../state/managed/BucketsFileSystemTest.java    | 23 +++++++++--
 .../lib/state/managed/DefaultBucketTest.java    |  4 +-
 .../IncrementalCheckpointManagerTest.java       | 43 +++++++++++++++++++-
 .../state/managed/ManagedTimeStateImplTest.java |  2 +-
 .../ManagedTimeUnifiedStateImplTest.java        |  4 +-
 .../MovingBoundaryTimeBucketAssignerTest.java   | 11 ++---
 10 files changed, 106 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 4f2cefd..6292fe2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -354,13 +354,19 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
     }
 
     /**
-     * Returns the value for the key from a time-bucket reader
+     * Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable.
+     * If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null.
      * @param key        key
      * @param timeBucket time bucket
-     * @return value if key is found in the time bucket; false otherwise
+     * @return value if key is found in the time bucket; null otherwise
      */
     private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
     {
+
+      if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner &&
+          timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) {
+        return null;
+      }
       FileAccess.FileReader fileReader = readers.get(timeBucket);
       if (fileReader != null) {
         return readValue(fileReader, key, timeBucket);
@@ -469,7 +475,6 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
           }
         }
       }
-
       sizeInBytes.getAndAdd(-memoryFreed);
 
       //add the windowId to the queue to let operator thread release memory from keyStream and valueStream

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
index 2bd6ef7..510f3f5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -139,13 +139,16 @@ public class BucketsFileSystem implements ManagedStateComponent
    * @param data            data of all time-buckets
    * @throws IOException
    */
-  protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException
+  protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data,  long latestPurgedTimeBucket) throws IOException
   {
     Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(),
         managedStateContext.getKeyComparator());
 
     for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
       long timeBucketId = entry.getValue().getTimeBucket();
+      if (timeBucketId <= latestPurgedTimeBucket) {
+        continue;
+      }
       timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
     }
 
@@ -273,17 +276,11 @@ public class BucketsFileSystem implements ManagedStateComponent
    */
   private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException
   {
-    MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId);
-    if (tbm != null) {
-      return tbm;
-    }
-    if (exists(bucketId, META_FILE_NAME)) {
+    if (!timeBucketsMeta.containsRow(bucketId) && exists(bucketId, META_FILE_NAME)) {
       try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
         //Load meta info of all the time buckets of the bucket identified by bucketId.
         loadBucketMetaFile(bucketId, dis);
       }
-    } else {
-      return null;
     }
     return timeBucketsMeta.get(bucketId, timeBucketId);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index aa7cec7..ed40aa6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -136,11 +136,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
 
           for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
             long bucketId = singleBucket.getKey();
-            if (bucketId > latestPurgedTimeBucket) {
-              managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue());
-            }
+            managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue(), latestPurgedTimeBucket);
           }
-          committed(windowId);
         } catch (Throwable t) {
           throwable.set(t);
           LOG.debug("transfer window {}", windowId, t);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
index cc8ea0a..f3b40e1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -107,7 +107,20 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
   }
 
   /**
-   * Get the bucket key for the long value and adjust boundaries if necessary.
+   * Get the bucket key for the long value and adjust boundaries if necessary. If boundaries adjusted then verify
+   * the triggerPurge is enabled or not. triggerPurge is enabled only when the lower bound changes.
+   *
+   * For example,
+   * ExpiryDuration = 1000 milliseconds, BucketSpan = 2000 milliseconds
+   * Times with 0,...999 belongs to time bucket id 0, times with 1000,...1999 belongs to bucket id 1,...so on.
+   * Initially start = 0, end = 2000, fixedStart = 0
+   * (1) If the input with time 50 milliseconds then this belongs to bucket id 0.
+   *
+   * (2) If the input with time 2100 milliseconds then boundary has to be adjusted.
+   *    Values after tuple is processed, diffInBuckets = 0, move = 1000, start = 1000, end = 3000,triggerPurge = true, lowestPurgeableTimeBucket = -1
+   *
+   * (3) If the input with time 3200 milliseconds then boundary has to be adjusted.
+   *    Values after tuple is processed, diffInBuckets = 0, move = 1000, start = 2000, end = 4000,triggerPurge = true, lowestPurgeableTimeBucket = 0
    *
    * @param time value from which bucket key is derived.
    * @return -1 if value is already expired; bucket key otherwise.
@@ -125,9 +138,11 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
       long move = (diffInBuckets + 1) * bucketSpanMillis;
       start += move;
       end += move;
-      lowestPurgeableTimeBucket += diffInBuckets;
       // trigger purge when lower bound changes
-      triggerPurge = (diffInBuckets > 0);
+      triggerPurge = (move > 0);
+      if (triggerPurge) {
+        lowestPurgeableTimeBucket = ((start - fixedStart) / bucketSpanMillis) - 2;
+      }
     }
     return key;
 
@@ -178,4 +193,8 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
     this.expireBefore = expireBefore;
   }
 
+  public long getLowestPurgeableTimeBucket()
+  {
+    return lowestPurgeableTimeBucket;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
index ede2c85..28b3824 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -72,7 +72,7 @@ public class BucketsFileSystemTest
   {
     testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
-    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1);
 
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
     testMeta.bucketsFileSystem.teardown();
@@ -83,10 +83,10 @@ public class BucketsFileSystemTest
   {
     testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
-    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1);
 
     Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100);
-    testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, more, -1);
 
     unsavedBucket0.putAll(more);
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
@@ -169,4 +169,21 @@ public class BucketsFileSystemTest
     Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
     testMeta.bucketsFileSystem.teardown();
   }
+
+  @Test
+  public void testFirstKeyAfterTransferBuckets() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(50, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1);
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket1 = ManagedStateTestUtils.getTestBucketData(24, 104);
+    testMeta.bucketsFileSystem.writeBucketData(20, 0, unsavedBucket1, -1);
+
+    BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(0, 104);
+    Assert.assertNotNull(immutableTbm);
+    Assert.assertEquals("last transferred window", 20, immutableTbm.getLastTransferredWindowId());
+    Assert.assertEquals("first key", "24", immutableTbm.getFirstKey().stringValue());
+    testMeta.bucketsFileSystem.teardown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index f7e24de..8a63f5a 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -98,7 +98,7 @@ public class DefaultBucketTest
     Slice one = ManagedStateTestUtils.getSliceFor("1");
 
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
-    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0, -1);
 
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
 
@@ -115,7 +115,7 @@ public class DefaultBucketTest
     Slice one = ManagedStateTestUtils.getSliceFor("1");
 
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
-    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0, -1);
 
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
index a7e0827..fe4c20d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -20,6 +20,7 @@
 package org.apache.apex.malhar.lib.state.managed;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
@@ -133,6 +135,43 @@ public class IncrementalCheckpointManagerTest
   }
 
   @Test
+  public void testTransferWindowFilesExcludeExpiredBuckets() throws IOException, InterruptedException
+  {
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+    int startKeyBucket = 200;
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = ManagedStateTestUtils.getTestData(startKeyBucket, startKeyBucket + 10, 0);
+    long latestExpiredTimeBucket = 102;
+    testMeta.checkpointManager.setLatestExpiredTimeBucket(latestExpiredTimeBucket);
+    testMeta.checkpointManager.save(buckets, 10, false);
+    //Need to synchronously call transfer window files so shutting down the other thread.
+    testMeta.checkpointManager.teardown();
+    Thread.sleep(500);
+
+    testMeta.checkpointManager.committed(10);
+    testMeta.checkpointManager.transferWindowFiles();
+
+    // Retrieve the data which is not expired
+    Map<Long, Map<Slice, Bucket.BucketedValue>> bucketsValidData = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      Map<Slice, Bucket.BucketedValue> data = buckets.get((long)startKeyBucket + i);
+      Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
+      for (Map.Entry<Slice,Bucket.BucketedValue> e: data.entrySet()) {
+        if (e.getValue().getTimeBucket() <= latestExpiredTimeBucket) {
+          continue;
+        }
+        bucketData.put(e.getKey(), e.getValue());
+      }
+      bucketsValidData.put((long)startKeyBucket + i, bucketData);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), startKeyBucket + i,
+          bucketsValidData.get((long)startKeyBucket + i), 1);
+    }
+  }
+
+  @Test
   public void testCommitted() throws IOException, InterruptedException
   {
     CountDownLatch latch = new CountDownLatch(5);
@@ -186,9 +225,9 @@ public class IncrementalCheckpointManagerTest
 
     @Override
     protected void writeBucketData(long windowId, long bucketId, Map<Slice,
-        Bucket.BucketedValue> data) throws IOException
+        Bucket.BucketedValue> data, long latestPurgedTimeBucket) throws IOException
     {
-      super.writeBucketData(windowId, bucketId, data);
+      super.writeBucketData(windowId, bucketId, data, latestPurgedTimeBucket);
       if (windowId == 10) {
         latch.countDown();
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
index ed53f08..2882828 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -86,7 +86,7 @@ public class ManagedTimeStateImplTest
     testMeta.managedState.setup(testMeta.operatorContext);
 
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
-    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1);
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
 
     Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 1d2334d..42ab187 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -125,7 +125,7 @@ public class ManagedTimeUnifiedStateImplTest
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
 
     //write data to disk explicitly
-    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1);
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
         testMeta.operatorContext.getId(), unsavedBucket0, 1);
 
@@ -147,7 +147,7 @@ public class ManagedTimeUnifiedStateImplTest
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
 
     //write data to disk explicitly
-    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1);
     ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
         testMeta.operatorContext.getId(), unsavedBucket0, 1);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
index 2b132f4..97f6c20 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -113,28 +113,25 @@ public class MovingBoundaryTimeBucketAssignerTest
     long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
     testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
     Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue());
-
     long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
     Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) );
     testMeta.timeBucketAssigner.endWindow();
-    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
+    Assert.assertEquals("purgeLessThanEqualTo", -1, purgeLessThanEqualTo.longValue());
 
     long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
     Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
     testMeta.timeBucketAssigner.endWindow();
-    Assert.assertEquals("purgeLessThanEqualTo", 7, purgeLessThanEqualTo.longValue());
-    purgeLessThanEqualTo.setValue(-2);
+    Assert.assertEquals("purgeLessThanEqualTo", 8, purgeLessThanEqualTo.longValue());
 
     long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
     Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
     testMeta.timeBucketAssigner.endWindow();
-// TODO: why is purgeLessThanEqualTo not moving to 8 here?
-    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
+    Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue());
 
     //Check for expiry of time1 now
     Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
     testMeta.timeBucketAssigner.endWindow();
-    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
+    Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue());
 
     testMeta.timeBucketAssigner.teardown();
   }