You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/19 14:14:20 UTC
apex-malhar git commit: APEXMALHAR-2406 APEXMALHAR-2407
APEXMALHAR-2408 ManagedState Issues
Repository: apex-malhar
Updated Branches:
refs/heads/master d5c24dc8e -> a26a9f8b2
APEXMALHAR-2406 APEXMALHAR-2407 APEXMALHAR-2408 ManagedState Issues
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a26a9f8b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a26a9f8b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a26a9f8b
Branch: refs/heads/master
Commit: a26a9f8b2940986db313957fd3c9969489ab47ee
Parents: d5c24dc
Author: chaitanya <ch...@apache.org>
Authored: Sun Mar 19 17:40:33 2017 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Sun Mar 19 17:40:55 2017 +0530
----------------------------------------------------------------------
.../apex/malhar/lib/state/managed/Bucket.java | 11 +++--
.../lib/state/managed/BucketsFileSystem.java | 13 +++---
.../managed/IncrementalCheckpointManager.java | 5 +--
.../MovingBoundaryTimeBucketAssigner.java | 25 ++++++++++--
.../state/managed/BucketsFileSystemTest.java | 23 +++++++++--
.../lib/state/managed/DefaultBucketTest.java | 4 +-
.../IncrementalCheckpointManagerTest.java | 43 +++++++++++++++++++-
.../state/managed/ManagedTimeStateImplTest.java | 2 +-
.../ManagedTimeUnifiedStateImplTest.java | 4 +-
.../MovingBoundaryTimeBucketAssignerTest.java | 11 ++---
10 files changed, 106 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 4f2cefd..6292fe2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -354,13 +354,19 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
}
/**
- * Returns the value for the key from a time-bucket reader
+ * Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable.
+ * If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null.
* @param key key
* @param timeBucket time bucket
- * @return value if key is found in the time bucket; false otherwise
+ * @return value if key is found in the time bucket; null otherwise
*/
private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
{
+
+ if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner &&
+ timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) {
+ return null;
+ }
FileAccess.FileReader fileReader = readers.get(timeBucket);
if (fileReader != null) {
return readValue(fileReader, key, timeBucket);
@@ -469,7 +475,6 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
}
}
}
-
sizeInBytes.getAndAdd(-memoryFreed);
//add the windowId to the queue to let operator thread release memory from keyStream and valueStream
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
index 2bd6ef7..510f3f5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -139,13 +139,16 @@ public class BucketsFileSystem implements ManagedStateComponent
* @param data data of all time-buckets
* @throws IOException
*/
- protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException
+ protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data, long latestPurgedTimeBucket) throws IOException
{
Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(),
managedStateContext.getKeyComparator());
for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
long timeBucketId = entry.getValue().getTimeBucket();
+ if (timeBucketId <= latestPurgedTimeBucket) {
+ continue;
+ }
timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
}
@@ -273,17 +276,11 @@ public class BucketsFileSystem implements ManagedStateComponent
*/
private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException
{
- MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId);
- if (tbm != null) {
- return tbm;
- }
- if (exists(bucketId, META_FILE_NAME)) {
+ if (!timeBucketsMeta.containsRow(bucketId) && exists(bucketId, META_FILE_NAME)) {
try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
//Load meta info of all the time buckets of the bucket identified by bucketId.
loadBucketMetaFile(bucketId, dis);
}
- } else {
- return null;
}
return timeBucketsMeta.get(bucketId, timeBucketId);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index aa7cec7..ed40aa6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -136,11 +136,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
long bucketId = singleBucket.getKey();
- if (bucketId > latestPurgedTimeBucket) {
- managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue());
- }
+ managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue(), latestPurgedTimeBucket);
}
- committed(windowId);
} catch (Throwable t) {
throwable.set(t);
LOG.debug("transfer window {}", windowId, t);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
index cc8ea0a..f3b40e1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -107,7 +107,20 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
}
/**
- * Get the bucket key for the long value and adjust boundaries if necessary.
+ * Get the bucket key for the long value and adjust boundaries if necessary. If boundaries adjusted then verify
+ * the triggerPurge is enabled or not. triggerPurge is enabled only when the lower bound changes.
+ *
+ * For example,
+ * ExpiryDuration = 1000 milliseconds, BucketSpan = 2000 milliseconds
+ * Times with 0,...999 belongs to time bucket id 0, times with 1000,...1999 belongs to bucket id 1,...so on.
+ * Initially start = 0, end = 2000, fixedStart = 0
+ * (1) If the input with time 50 milliseconds then this belongs to bucket id 0.
+ *
+ * (2) If the input with time 2100 milliseconds then boundary has to be adjusted.
+ * Values after tuple is processed, diffInBuckets = 0, move = 1000, start = 1000, end = 3000,triggerPurge = true, lowestPurgeableTimeBucket = -1
+ *
+ * (3) If the input with time 3200 milliseconds then boundary has to be adjusted.
+ * Values after tuple is processed, diffInBuckets = 0, move = 1000, start = 2000, end = 4000,triggerPurge = true, lowestPurgeableTimeBucket = 0
*
* @param time value from which bucket key is derived.
* @return -1 if value is already expired; bucket key otherwise.
@@ -125,9 +138,11 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
long move = (diffInBuckets + 1) * bucketSpanMillis;
start += move;
end += move;
- lowestPurgeableTimeBucket += diffInBuckets;
// trigger purge when lower bound changes
- triggerPurge = (diffInBuckets > 0);
+ triggerPurge = (move > 0);
+ if (triggerPurge) {
+ lowestPurgeableTimeBucket = ((start - fixedStart) / bucketSpanMillis) - 2;
+ }
}
return key;
@@ -178,4 +193,8 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
this.expireBefore = expireBefore;
}
+ public long getLowestPurgeableTimeBucket()
+ {
+ return lowestPurgeableTimeBucket;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
index ede2c85..28b3824 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -72,7 +72,7 @@ public class BucketsFileSystemTest
{
testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
- testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
testMeta.bucketsFileSystem.teardown();
@@ -83,10 +83,10 @@ public class BucketsFileSystemTest
{
testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
- testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1);
Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100);
- testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, more, -1);
unsavedBucket0.putAll(more);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
@@ -169,4 +169,21 @@ public class BucketsFileSystemTest
Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
testMeta.bucketsFileSystem.teardown();
}
+
+ @Test
+ public void testFirstKeyAfterTransferBuckets() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(50, 100);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1);
+
+ Map<Slice, Bucket.BucketedValue> unsavedBucket1 = ManagedStateTestUtils.getTestBucketData(24, 104);
+ testMeta.bucketsFileSystem.writeBucketData(20, 0, unsavedBucket1, -1);
+
+ BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(0, 104);
+ Assert.assertNotNull(immutableTbm);
+ Assert.assertEquals("last transferred window", 20, immutableTbm.getLastTransferredWindowId());
+ Assert.assertEquals("first key", "24", immutableTbm.getFirstKey().stringValue());
+ testMeta.bucketsFileSystem.teardown();
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index f7e24de..8a63f5a 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -98,7 +98,7 @@ public class DefaultBucketTest
Slice one = ManagedStateTestUtils.getSliceFor("1");
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
- testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+ testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0, -1);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
@@ -115,7 +115,7 @@ public class DefaultBucketTest
Slice one = ManagedStateTestUtils.getSliceFor("1");
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
- testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+ testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0, -1);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
index a7e0827..fe4c20d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -20,6 +20,7 @@
package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.datatorrent.api.Context;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
@@ -133,6 +135,43 @@ public class IncrementalCheckpointManagerTest
}
@Test
+ public void testTransferWindowFilesExcludeExpiredBuckets() throws IOException, InterruptedException
+ {
+ testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+ int startKeyBucket = 200;
+ Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = ManagedStateTestUtils.getTestData(startKeyBucket, startKeyBucket + 10, 0);
+ long latestExpiredTimeBucket = 102;
+ testMeta.checkpointManager.setLatestExpiredTimeBucket(latestExpiredTimeBucket);
+ testMeta.checkpointManager.save(buckets, 10, false);
+ //Need to synchronously call transfer window files so shutting down the other thread.
+ testMeta.checkpointManager.teardown();
+ Thread.sleep(500);
+
+ testMeta.checkpointManager.committed(10);
+ testMeta.checkpointManager.transferWindowFiles();
+
+ // Retrieve the data which is not expired
+ Map<Long, Map<Slice, Bucket.BucketedValue>> bucketsValidData = new HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ Map<Slice, Bucket.BucketedValue> data = buckets.get((long)startKeyBucket + i);
+ Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
+ for (Map.Entry<Slice,Bucket.BucketedValue> e: data.entrySet()) {
+ if (e.getValue().getTimeBucket() <= latestExpiredTimeBucket) {
+ continue;
+ }
+ bucketData.put(e.getKey(), e.getValue());
+ }
+ bucketsValidData.put((long)startKeyBucket + i, bucketData);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), startKeyBucket + i,
+ bucketsValidData.get((long)startKeyBucket + i), 1);
+ }
+ }
+
+ @Test
public void testCommitted() throws IOException, InterruptedException
{
CountDownLatch latch = new CountDownLatch(5);
@@ -186,9 +225,9 @@ public class IncrementalCheckpointManagerTest
@Override
protected void writeBucketData(long windowId, long bucketId, Map<Slice,
- Bucket.BucketedValue> data) throws IOException
+ Bucket.BucketedValue> data, long latestPurgedTimeBucket) throws IOException
{
- super.writeBucketData(windowId, bucketId, data);
+ super.writeBucketData(windowId, bucketId, data, latestPurgedTimeBucket);
if (windowId == 10) {
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
index ed53f08..2882828 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -86,7 +86,7 @@ public class ManagedTimeStateImplTest
testMeta.managedState.setup(testMeta.operatorContext);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
- testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+ testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 1d2334d..42ab187 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -125,7 +125,7 @@ public class ManagedTimeUnifiedStateImplTest
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
//write data to disk explicitly
- testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+ testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
testMeta.operatorContext.getId(), unsavedBucket0, 1);
@@ -147,7 +147,7 @@ public class ManagedTimeUnifiedStateImplTest
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
//write data to disk explicitly
- testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+ testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1);
ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
testMeta.operatorContext.getId(), unsavedBucket0, 1);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
index 2b132f4..97f6c20 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -113,28 +113,25 @@ public class MovingBoundaryTimeBucketAssignerTest
long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue());
-
long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) );
testMeta.timeBucketAssigner.endWindow();
- Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
+ Assert.assertEquals("purgeLessThanEqualTo", -1, purgeLessThanEqualTo.longValue());
long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
testMeta.timeBucketAssigner.endWindow();
- Assert.assertEquals("purgeLessThanEqualTo", 7, purgeLessThanEqualTo.longValue());
- purgeLessThanEqualTo.setValue(-2);
+ Assert.assertEquals("purgeLessThanEqualTo", 8, purgeLessThanEqualTo.longValue());
long time2 = Duration.standardSeconds(10).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
testMeta.timeBucketAssigner.endWindow();
-// TODO: why is purgeLessThanEqualTo not moving to 8 here?
- Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
+ Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue());
//Check for expiry of time1 now
Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
testMeta.timeBucketAssigner.endWindow();
- Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
+ Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue());
testMeta.timeBucketAssigner.teardown();
}