You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/23 06:51:51 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2345 Fix
MovingBoundaryTimeBucketAssigner initialization and purge trigger.
Repository: apex-malhar
Updated Branches:
refs/heads/master e22ea0de1 -> 1ae14c03a
APEXMALHAR-2345 Fix MovingBoundaryTimeBucketAssigner initialization and purge trigger.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1ae14c03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1ae14c03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1ae14c03
Branch: refs/heads/master
Commit: 1ae14c03a73e2814b57ce06680db57da80f0b42b
Parents: 9043f9d
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jan 22 13:19:44 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Sun Jan 22 21:45:28 2017 -0800
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 3 +++
.../managed/IncrementalCheckpointManager.java | 1 +
.../MovingBoundaryTimeBucketAssigner.java | 5 ++--
.../lib/state/managed/ManagedStateImplTest.java | 4 ++--
.../MovingBoundaryTimeBucketAssignerTest.java | 25 ++++++++++++++++++++
5 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index daae2d8..f676b84 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -220,6 +220,8 @@ public abstract class AbstractManagedStateImpl
buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), bucketEntry.getValue());
}
}
+ // Skip write to WAL during recovery during replay from WAL.
+ // Data only needs to be transferred to bucket data files.
checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/);
}
} catch (IOException e) {
@@ -369,6 +371,7 @@ public abstract class AbstractManagedStateImpl
}
if (!flashData.isEmpty()) {
try {
+ // write incremental state to WAL (skipWrite=false) before the checkpoint
checkpointManager.save(flashData, windowId, false);
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 65c1d1e..aa7cec7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -111,6 +111,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
if (latestExpiredTimeBucket.get() > -1) {
try {
latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1);
+ //LOG.debug("latestPurgedTimeBucket {}", latestPurgedTimeBucket);
managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket);
} catch (IOException e) {
throwable.set(e);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
index ece7686..cc8ea0a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -67,7 +67,7 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
private int numBuckets;
private transient long fixedStart;
private transient boolean triggerPurge;
- private transient long lowestPurgeableTimeBucket;
+ private transient long lowestPurgeableTimeBucket = -1;
@Override
@@ -125,8 +125,9 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
long move = (diffInBuckets + 1) * bucketSpanMillis;
start += move;
end += move;
- triggerPurge = true;
lowestPurgeableTimeBucket += diffInBuckets;
+ // trigger purge when lower bound changes
+ triggerPurge = (diffInBuckets > 0);
}
return key;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
index 99e6c23..dab3925 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -177,7 +177,7 @@ public class ManagedStateImplTest
testMeta.managedState.setup(testMeta.operatorContext);
int numKeys = 300;
- long lastWindowId = (long)numKeys;
+ long lastWindowId = numKeys;
for (long windowId = 0L; windowId < lastWindowId; windowId++) {
testMeta.managedState.beginWindow(windowId);
@@ -197,7 +197,7 @@ public class ManagedStateImplTest
for (int key = numKeys - 1; key > 0; key--) {
Slice keyVal = ManagedStateTestUtils.getSliceFor(Integer.toString(key));
Slice val = testMeta.managedState.getSync(0L, keyVal);
- Assert.assertNotNull(val);
+ Assert.assertNotNull("null value for key " + key, val);
}
testMeta.managedState.endWindow();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
index e4e5d2e..2b132f4 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -28,6 +28,8 @@ import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.commons.lang3.mutable.MutableLong;
+
import com.datatorrent.lib.util.KryoCloneUtils;
public class MovingBoundaryTimeBucketAssignerTest
@@ -96,20 +98,43 @@ public class MovingBoundaryTimeBucketAssignerTest
@Test
public void testTimeBucketKeyExpiry()
{
+ final MutableLong purgeLessThanEqualTo = new MutableLong(-2);
testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+ testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
+ {
+ @Override
+ public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+ {
+ purgeLessThanEqualTo.setValue(timeBucket);
+ }
+ });
long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+ Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue());
+
+ long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
+ Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) );
+ testMeta.timeBucketAssigner.endWindow();
+ Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+ testMeta.timeBucketAssigner.endWindow();
+ Assert.assertEquals("purgeLessThanEqualTo", 7, purgeLessThanEqualTo.longValue());
+ purgeLessThanEqualTo.setValue(-2);
long time2 = Duration.standardSeconds(10).getMillis() + referenceTime;
Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
+ testMeta.timeBucketAssigner.endWindow();
+// TODO: why is purgeLessThanEqualTo not moving to 8 here?
+ Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
//Check for expiry of time1 now
Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+ testMeta.timeBucketAssigner.endWindow();
+ Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
testMeta.timeBucketAssigner.teardown();
}
[2/2] apex-malhar git commit: APEXMALHAR-2345 purging time buckets in
WindowedStorage
Posted by th...@apache.org.
APEXMALHAR-2345 purging time buckets in WindowedStorage
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9043f9d4
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9043f9d4
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9043f9d4
Branch: refs/heads/master
Commit: 9043f9d4ea5514cbc7cdf7ad826eb19c73a9ef9e
Parents: e22ea0d
Author: David Yan <da...@datatorrent.com>
Authored: Thu Nov 17 12:58:34 2016 -0800
Committer: David Yan <da...@apache.org>
Committed: Sun Jan 22 21:45:28 2017 -0800
----------------------------------------------------------------------
.../AbstractWindowedOperatorBenchmarkApp.java | 2 ++
benchmark/src/test/resources/log4j.properties | 2 +-
.../managed/IncrementalCheckpointManager.java | 11 ++++++----
.../managed/ManagedTimeUnifiedStateImpl.java | 2 +-
.../malhar/lib/state/managed/StateTracker.java | 1 -
.../spillable/SpillableComplexComponent.java | 2 ++
.../SpillableComplexComponentImpl.java | 5 +++++
.../lib/state/spillable/SpillableSetImpl.java | 2 +-
.../apache/apex/malhar/lib/window/Window.java | 14 +++++--------
.../apex/malhar/lib/window/WindowedStorage.java | 5 +++++
.../window/impl/AbstractWindowedOperator.java | 16 ++++++++++++++-
.../window/impl/InMemoryWindowedStorage.java | 13 ++++++++++++
.../impl/SpillableWindowedKeyedStorage.java | 21 ++++++++++++++++++++
.../impl/SpillableWindowedPlainStorage.java | 18 +++++++++++++++++
14 files changed, 96 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 7250e74..4b9b423 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -25,6 +25,7 @@ import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.UnboundedTimeBucketAssigner;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore;
@@ -141,6 +142,7 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
{
String basePath = getStoreBasePath(conf);
ManagedTimeUnifiedStateSpillableStateStore store = new ManagedTimeUnifiedStateSpillableStateStore();
+ store.setTimeBucketAssigner(new UnboundedTimeBucketAssigner());
store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000));
((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index 92e48b7..2f3bc53 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -41,5 +41,5 @@ log4j.logger.org=info
#log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=debug
log4j.logger.org.apache.apex=debug
-log4j.logger.org.apache.apex.malhar.lib.state.managed=info
+log4j.logger.org.apache.apex.malhar.lib.state.managed=debug
log4j.logger.com.datatorrent.common.util.FSStorageAgent=info
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 3b01ed2..65c1d1e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -72,6 +72,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
protected transient ManagedStateContext managedStateContext;
private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1);
+ private long latestPurgedTimeBucket = -1;
private transient int waitMillis;
private volatile long lastTransferredWindow = Stateless.WINDOW_ID;
@@ -109,8 +110,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
transferWindowFiles();
if (latestExpiredTimeBucket.get() > -1) {
try {
- managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(
- latestExpiredTimeBucket.getAndSet(-1));
+ latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1);
+ managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket);
} catch (IOException e) {
throwable.set(e);
LOG.debug("delete files", e);
@@ -133,8 +134,10 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId);
for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
- managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(),
- singleBucket.getValue());
+ long bucketId = singleBucket.getKey();
+ if (bucketId > latestPurgedTimeBucket) {
+ managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue());
+ }
}
committed(windowId);
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index d558eee..62ebbc5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -142,7 +142,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
@Override
public void setup(Context.OperatorContext context)
{
- // set UnBoundedTimeBucketAssigner to this managed state impl
+ // set UnboundedTimeBucketAssigner to this managed state impl
if (timeBucketAssigner == null) {
UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner();
setTimeBucketAssigner(unboundedTimeBucketAssigner);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 7cab41c..e17f5b9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -115,7 +115,6 @@ class StateTracker extends TimerTask
long sizeFreed;
try {
sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
- LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
} catch (IOException e) {
managedStateImpl.throwable.set(e);
throw new RuntimeException("freeing " + bucketId, e);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index b6ec6a2..c00589a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -196,4 +196,6 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @return A {@link SpillableQueue}.
*/
<T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde);
+
+ SpillableStateStore getStore();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index 1d9fbc6..1ecf86e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -261,4 +261,9 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
{
store.committed(l);
}
+
+ public SpillableStateStore getStore()
+ {
+ return store;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index 221cd38..33e48ac 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -131,7 +131,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
public SpillableSetImpl(@NotNull byte[] prefix,
@NotNull SpillableStateStore store,
@NotNull Serde<T> serde,
- @NotNull TimeExtractor timeExtractor)
+ @NotNull TimeExtractor<T> timeExtractor)
{
map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, serde, new ListNodeSerde<>(serde), timeExtractor);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
index 1d7681d..edcfc2c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -146,16 +146,12 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
@Override
public int compareTo(TimeWindow o)
{
- if (this.getBeginTimestamp() < o.getBeginTimestamp()) {
- return -1;
- } else if (this.getBeginTimestamp() > o.getBeginTimestamp()) {
- return 1;
- } else if (this.getDurationMillis() < o.getDurationMillis()) {
- return -1;
- } else if (this.getDurationMillis() > o.getDurationMillis()) {
- return 1;
+ long diff = (this.getBeginTimestamp() + this.getDurationMillis()) - (o.getBeginTimestamp() + o.getDurationMillis());
+ if (diff == 0) {
+ return Long.signum(this.getBeginTimestamp() - o.getBeginTimestamp());
+ } else {
+ return Long.signum(diff);
}
- return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
index e2874ba..55f8a02 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -56,6 +56,11 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
void remove(Window window);
/**
+ * Purge checkpointed data for all the windows that lie totally beyond the given horizon
+ */
+ void purge(long horizonMillis);
+
+ /**
* This interface handles plain value per window. If there is a key/value map for each window, use
* {@link WindowedKeyedStorage}. Also note that a single T object is assumed to be fit in memory
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 4ba81b3..22e8525 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import javax.validation.ValidationException;
@@ -89,6 +90,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
private long currentDerivedTimestamp = -1;
private long timeIncrement;
protected long fixedWatermarkMillis = -1;
+ private transient long streamingWindowId;
+ private transient TreeMap<Long, Long> streamingWindowToLatenessHorizon = new TreeMap<>();
private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
@@ -407,7 +410,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
public void dropTuple(Tuple input)
{
// do nothing
- LOG.debug("Dropping late tuple {}", input);
}
@Override
@@ -465,6 +467,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
} else {
currentDerivedTimestamp += timeIncrement;
}
+ streamingWindowId = windowId;
}
/**
@@ -517,6 +520,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
}
+ streamingWindowToLatenessHorizon.put(streamingWindowId, horizon);
controlOutput.emit(new WatermarkImpl(nextWatermark));
this.currentWatermark = nextWatermark;
}
@@ -623,5 +627,15 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
((CheckpointNotificationListener)component).committed(windowId);
}
}
+ Long floorWindowId = streamingWindowToLatenessHorizon.floorKey(windowId);
+ if (floorWindowId != null) {
+ long horizon = streamingWindowToLatenessHorizon.get(floorWindowId);
+ windowStateMap.purge(horizon);
+ dataStorage.purge(horizon);
+ if (retractionStorage != null) {
+ retractionStorage.purge(horizon);
+ }
+ streamingWindowToLatenessHorizon.headMap(windowId, true).clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
index db18a40..730bcf9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
@@ -18,6 +18,7 @@
*/
package org.apache.apex.malhar.lib.window.impl;
+import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
@@ -86,4 +87,16 @@ public class InMemoryWindowedStorage<T> implements WindowedStorage.WindowedPlain
{
}
+ @Override
+ public void purge(long horizonMillis)
+ {
+ for (Iterator<Map.Entry<Window, T>> iterator = map.entrySet().iterator(); iterator.hasNext(); ) {
+ Window window = iterator.next().getKey();
+ if (window.getBeginTimestamp() + window.getDurationMillis() < horizonMillis) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index bf0c804..6138f97 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -25,8 +25,13 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.window.Window;
@@ -57,6 +62,9 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
+ private static final Logger LOG = LoggerFactory.getLogger(SpillableWindowedKeyedStorage.class);
+
+
private class KVIterator implements Iterator<Map.Entry<K, V>>
{
final Window window;
@@ -221,4 +229,17 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
{
return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
}
+
+ @Override
+ public void purge(long horizonMillis)
+ {
+ SpillableStateStore store = scc.getStore();
+ if (store instanceof ManagedTimeUnifiedStateImpl) {
+ ManagedTimeUnifiedStateImpl timeState = (ManagedTimeUnifiedStateImpl)store;
+ long purgeTimeBucket = horizonMillis - timeState.getTimeBucketAssigner().getBucketSpan().getMillis();
+ LOG.debug("Purging state less than equal to {}", purgeTimeBucket);
+ timeState.purgeTimeBucketsLessThanEqualTo(purgeTimeBucket);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index f70771c..a58d89f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -22,8 +22,13 @@ import java.util.Map;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.window.Window;
@@ -45,6 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
private long bucket;
private Serde<Window> windowSerde;
private Serde<T> valueSerde;
+ private static final Logger LOG = LoggerFactory.getLogger(SpillableWindowedPlainStorage.class);
protected Spillable.SpillableMap<Window, T> windowToDataMap;
@@ -143,4 +149,16 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
public void teardown()
{
}
+
+ @Override
+ public void purge(long horizonMillis)
+ {
+ SpillableStateStore store = scc.getStore();
+ if (store instanceof ManagedTimeUnifiedStateImpl) {
+ ManagedTimeUnifiedStateImpl timeState = (ManagedTimeUnifiedStateImpl)store;
+ long purgeTimeBucket = horizonMillis - timeState.getTimeBucketAssigner().getBucketSpan().getMillis();
+ LOG.debug("Purging state less than equal to {}", purgeTimeBucket);
+ timeState.purgeTimeBucketsLessThanEqualTo(purgeTimeBucket);
+ }
+ }
}