You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/16 04:46:05 UTC
apex-malhar git commit: APEXMALHAR-2301 Refactor timebucketassigner
to add a single timebucket assigner and change the timebucket metadata from
array to map to handle unbounded time buckets
Repository: apex-malhar
Updated Branches:
refs/heads/master 52510b0f8 -> 4ab457f18
APEXMALHAR-2301 Refactor timebucketassigner to add a single timebucket assigner and change the timebucket metadata from array to map to handle unbounded time buckets
This closes #503
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4ab457f1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4ab457f1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4ab457f1
Branch: refs/heads/master
Commit: 4ab457f18a8af8892d8f3f4a5f6e6c2eb50995ac
Parents: 52510b0
Author: Siyuan Hua <hs...@apache.org>
Authored: Sun Nov 20 22:01:59 2016 -0800
Committer: David Yan <da...@apache.org>
Committed: Sun Jan 15 20:44:06 2017 -0800
----------------------------------------------------------------------
.../apex/malhar/lib/dedup/AbstractDeduper.java | 4 +-
.../malhar/lib/dedup/BoundedDedupOperator.java | 4 +-
.../lib/dedup/TimeBasedDedupOperator.java | 6 +-
.../AbstractManagedStateInnerJoinOperator.java | 8 +-
.../state/managed/AbstractManagedStateImpl.java | 64 ++++---
.../lib/state/managed/ManagedStateImpl.java | 4 +-
.../lib/state/managed/ManagedTimeStateImpl.java | 15 +-
.../managed/ManagedTimeStateMultiValue.java | 8 +-
.../managed/ManagedTimeUnifiedStateImpl.java | 38 ++--
.../MovingBoundaryTimeBucketAssigner.java | 180 +++++++++++++++++++
.../malhar/lib/state/managed/StateTracker.java | 2 +-
.../lib/state/managed/TimeBucketAssigner.java | 167 ++++-------------
.../malhar/lib/state/managed/TimeExtractor.java | 23 +++
.../managed/UnboundedTimeBucketAssigner.java | 70 ++++++++
.../spillable/SpillableSetMultimapImpl.java | 27 +--
.../impl/SpillableSessionWindowedStorage.java | 6 +-
.../ManagedTimeUnifiedStateImplTest.java | 12 +-
.../state/managed/MockManagedStateContext.java | 6 +-
.../MovingBoundaryTimeBucketAssignerTest.java | 116 ++++++++++++
.../state/managed/TimeBucketAssignerTest.java | 116 ------------
.../malhar/lib/window/WindowedOperatorTest.java | 3 +-
21 files changed, 526 insertions(+), 353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index c73cea7..bf81fde 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
-import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
@@ -54,7 +54,7 @@ import com.datatorrent.netlet.util.Slice;
/**
* Abstract class which allows de-duplicating incoming tuples based on a configured key.
- * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner}
+ * Also supports expiry mechanism based on a configurable expiry period configured using {@link MovingBoundaryTimeBucketAssigner}
* in {@link ManagedTimeUnifiedStateImpl}
* Following steps are used in identifying the state of a particular tuple:
* 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
index 7763103..e2a1297 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
-import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -110,7 +110,7 @@ public class BoundedDedupOperator extends AbstractDeduper<Object>
numBuckets = DEFAULT_NUM_BUCKETS;
}
((ManagedTimeStateImpl)managedState).setNumBuckets(numBuckets);
- TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+ MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
managedState.setTimeBucketAssigner(timeBucketAssigner);
super.setup(context);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
index 3f888cc..3b5f5e2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
@@ -25,7 +25,7 @@ import javax.validation.constraints.NotNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
-import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.datatorrent.api.Context;
@@ -64,7 +64,7 @@ import com.datatorrent.netlet.util.Slice;
* 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry.
* Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry
* to be from that instant = r. The start and end of the expiry window periodically move by the span of a single
- * bucket. Refer {@link TimeBucketAssigner} for details.
+ * bucket. Refer {@link MovingBoundaryTimeBucketAssigner} for details.
*
* Additionally, it also needs the following parameters:
* 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO)
@@ -151,7 +151,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
@Override
public void setup(OperatorContext context)
{
- TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+ MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan));
timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore));
timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000));
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
index c82c3e3..453da80 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
import org.joda.time.Duration;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
@@ -68,12 +69,13 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac
stream2Store = new ManagedTimeStateImpl();
stream1Store.setNumBuckets(noOfBuckets);
stream2Store.setNumBuckets(noOfBuckets);
+ assert stream1Store.getTimeBucketAssigner() == stream2Store.getTimeBucketAssigner();
if (bucketSpanTime != null) {
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
- stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
}
- stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
- stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+ if (stream1Store.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner) {
+ ((MovingBoundaryTimeBucketAssigner)stream1Store.getTimeBucketAssigner()).setExpireBefore(Duration.millis(getExpiryTime()));
+ }
stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary());
stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 364bc19..daae2d8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -20,6 +20,7 @@ package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -128,14 +129,14 @@ public abstract class AbstractManagedStateImpl
{
private long maxMemorySize;
- protected int numBuckets;
+ protected long numBuckets;
@NotNull
private FileAccess fileAccess = new TFileImpl.DTFileImpl();
@NotNull
- protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+ protected TimeBucketAssigner timeBucketAssigner;
- protected Bucket[] buckets;
+ protected Map<Long, Bucket> buckets;
@Min(1)
private int numReaders = 1;
@@ -176,6 +177,11 @@ public abstract class AbstractManagedStateImpl
operatorContext = context;
fileAccess.init();
+ if (timeBucketAssigner == null) {
+ // set default time bucket assigner
+ MovingBoundaryTimeBucketAssigner movingBoundaryTimeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
+ setTimeBucketAssigner(movingBoundaryTimeBucketAssigner);
+ }
timeBucketAssigner.setPurgeListener(this);
//setup all the managed state components
@@ -184,11 +190,11 @@ public abstract class AbstractManagedStateImpl
bucketsFileSystem.setup(this);
if (buckets == null) {
- //create buckets array only once at start when it is not created.
+ //create buckets map only once at start if it is not created.
numBuckets = getNumBuckets();
- buckets = new Bucket[numBuckets];
+ buckets = new HashMap<>();
}
- for (Bucket bucket : buckets) {
+ for (Bucket bucket : buckets.values()) {
if (bucket != null) {
bucket.setup(this);
}
@@ -210,8 +216,8 @@ public abstract class AbstractManagedStateImpl
stateEntry.getValue();
if (state != null && !state.isEmpty()) {
for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> bucketEntry : state.entrySet()) {
- int bucketIdx = prepareBucket(bucketEntry.getKey());
- buckets[bucketIdx].recoveredData(stateEntry.getKey(), bucketEntry.getValue());
+ long bucketIdx = prepareBucket(bucketEntry.getKey());
+ buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), bucketEntry.getValue());
}
}
checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/);
@@ -231,7 +237,7 @@ public abstract class AbstractManagedStateImpl
*
* @return number of buckets.
*/
- public abstract int getNumBuckets();
+ public abstract long getNumBuckets();
public void beginWindow(long windowId)
{
@@ -246,17 +252,17 @@ public abstract class AbstractManagedStateImpl
* @param bucketId bucket key
* @return bucket index
*/
- protected int prepareBucket(long bucketId)
+ protected long prepareBucket(long bucketId)
{
stateTracker.bucketAccessed(bucketId);
- int bucketIdx = getBucketIdx(bucketId);
+ long bucketIdx = getBucketIdx(bucketId);
- Bucket bucket = buckets[bucketIdx];
+ Bucket bucket = buckets.get(bucketIdx);
if (bucket == null) {
//bucket is not in memory
bucket = newBucket(bucketId);
bucket.setup(this);
- buckets[bucketIdx] = bucket;
+ buckets.put(bucketIdx, bucket);
} else if (bucket.getBucketId() != bucketId) {
handleBucketConflict(bucketIdx, bucketId);
}
@@ -269,13 +275,13 @@ public abstract class AbstractManagedStateImpl
Preconditions.checkNotNull(value, "value");
if (timeBucket != -1) {
//time bucket is invalid data is not stored
- int bucketIdx = prepareBucket(bucketId);
+ long bucketIdx = prepareBucket(bucketId);
//synchronization on a bucket isn't required for put because the event is added to flash which is
//a concurrent map. The assumption here is that the calls to put & get(sync/async) are being made synchronously by
//a single thread (operator thread). The get(sync/async) always checks memory first synchronously.
//If the key is not in the memory, then the async get will uses other reader threads which will fetch it from
//the files.
- buckets[bucketIdx].put(key, timeBucket, value);
+ buckets.get(bucketIdx).put(key, timeBucket, value);
}
}
@@ -283,8 +289,8 @@ public abstract class AbstractManagedStateImpl
protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key)
{
Preconditions.checkNotNull(key, "key");
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
+ long bucketIdx = prepareBucket(bucketId);
+ Bucket bucket = buckets.get(bucketIdx);
synchronized (bucket) {
return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
}
@@ -294,8 +300,8 @@ public abstract class AbstractManagedStateImpl
protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key)
{
Preconditions.checkNotNull(key, "key");
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
+ long bucketIdx = prepareBucket(bucketId);
+ Bucket bucket = buckets.get(bucketIdx);
synchronized (bucket) {
Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY);
if (cachedVal != null) {
@@ -307,20 +313,20 @@ public abstract class AbstractManagedStateImpl
}
}
- protected void handleBucketConflict(int bucketIdx, long newBucketId)
+ protected void handleBucketConflict(long bucketIdx, long newBucketId)
{
- throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId);
+ throw new IllegalArgumentException("bucket conflict " + buckets.get(bucketIdx).getBucketId() + " " + newBucketId);
}
- protected int getBucketIdx(long bucketId)
+ protected long getBucketIdx(long bucketId)
{
- return (int)Math.abs(bucketId % numBuckets);
+ return Math.abs(bucketId % numBuckets);
}
@Override
public Bucket getBucket(long bucketId)
{
- return buckets[getBucketIdx(bucketId)];
+ return buckets.get(getBucketIdx(bucketId));
}
@Override
@@ -330,7 +336,7 @@ public abstract class AbstractManagedStateImpl
if (b == null) {
b = newBucket(bucketId);
b.setup(this);
- buckets[getBucketIdx(bucketId)] = b;
+ buckets.put(getBucketIdx(bucketId), b);
}
return b;
}
@@ -351,7 +357,7 @@ public abstract class AbstractManagedStateImpl
{
Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap();
- for (Bucket bucket : buckets) {
+ for (Bucket bucket : buckets.values()) {
if (bucket != null) {
synchronized (bucket) {
Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId);
@@ -381,7 +387,7 @@ public abstract class AbstractManagedStateImpl
{
synchronized (commitLock) {
try {
- for (Bucket bucket : buckets) {
+ for (Bucket bucket : buckets.values()) {
if (bucket != null) {
synchronized (bucket) {
bucket.committed(windowId);
@@ -402,7 +408,7 @@ public abstract class AbstractManagedStateImpl
public Map<Long, Long> getBucketMemoryUsage()
{
Map<Long, Long> bucketToSize = Maps.newHashMap();
- for (Bucket bucket : buckets) {
+ for (Bucket bucket : buckets.values()) {
if (bucket == null) {
continue;
}
@@ -419,7 +425,7 @@ public abstract class AbstractManagedStateImpl
bucketsFileSystem.teardown();
timeBucketAssigner.teardown();
readerService.shutdownNow();
- for (Bucket bucket : buckets) {
+ for (Bucket bucket : buckets.values()) {
if (bucket != null) {
synchronized (bucket) {
bucket.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
index ba8cdc6..bf2209f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -55,7 +55,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket
@Override
public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
putInBucket(bucketId, timeBucket, key, value);
}
@@ -88,7 +88,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket
@Min(1)
@Override
- public int getNumBuckets()
+ public long getNumBuckets()
{
return numBuckets;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
index eddc736..caa5e2b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -28,6 +28,7 @@ import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState;
import com.google.common.util.concurrent.Futures;
+import com.datatorrent.api.Context;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.netlet.util.Slice;
@@ -48,7 +49,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Override
public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
putInBucket(bucketId, timeBucket, key, value);
}
@@ -61,7 +62,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Override
public Slice getSync(long bucketId, long time, @NotNull Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return BucketedState.EXPIRED;
@@ -78,7 +79,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Override
public Future<Slice> getAsync(long bucketId, long time, Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return Futures.immediateFuture(BucketedState.EXPIRED);
@@ -88,11 +89,17 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Min(1)
@Override
- public int getNumBuckets()
+ public long getNumBuckets()
{
return numBuckets;
}
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ }
+
/**
* Sets the number of buckets.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
index bd3319f..19a6abc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -182,7 +182,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
{
if (isKeyContainsMultiValue) {
Slice keySlice = streamCodec.toByteArray(k);
- int bucketId = getBucketId(k);
+ long bucketId = getBucketId(k);
Slice valueSlice = store.getSync(bucketId, keySlice);
List<V> listOb;
if (valueSlice == null || valueSlice.length == 0) {
@@ -207,7 +207,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
{
if (isKeyContainsMultiValue) {
Slice keySlice = streamCodec.toByteArray(k);
- int bucketId = getBucketId(k);
+ long bucketId = getBucketId(k);
Slice valueSlice = store.getSync(bucketId, keySlice);
List<V> listOb;
if (valueSlice == null || valueSlice.length == 0) {
@@ -232,7 +232,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
*/
private boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice)
{
- long timeBucketId = store.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(timeBucket);
+ long timeBucketId = store.getTimeBucketAssigner().getTimeBucket(timeBucket);
if (timeBucketId != -1) {
store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice);
return true;
@@ -270,7 +270,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
throw new UnsupportedOperationException();
}
- public int getBucketId(K k)
+ public long getBucketId(K k)
{
return k.hashCode() % store.getNumBuckets();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index e04286e..d558eee 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
+import com.datatorrent.api.Context;
import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.netlet.util.Slice;
@@ -59,7 +60,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
}
@Override
- public int getNumBuckets()
+ public long getNumBuckets()
{
return timeBucketAssigner.getNumBuckets();
}
@@ -67,14 +68,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
@Override
public void put(long time, @NotNull Slice key, @NotNull Slice value)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
putInBucket(timeBucket, timeBucket, key, value);
}
@Override
public Slice getSync(long time, @NotNull Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
if (timeBucket == -1) {
//time is expired so return expired slice.
return BucketedState.EXPIRED;
@@ -85,7 +86,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
@Override
public Future<Slice> getAsync(long time, @NotNull Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = timeBucketAssigner.getTimeBucket(time);
if (timeBucket == -1) {
//time is expired so return expired slice.
return Futures.immediateFuture(BucketedState.EXPIRED);
@@ -101,10 +102,10 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
//collect all the purged time buckets
while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
- int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
- if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) {
- bucketsForTeardown.add(buckets[purgedTimeBucketIdx]);
- buckets[purgedTimeBucketIdx] = null;
+ long purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
+ if (buckets.containsKey(purgedTimeBucketIdx) && buckets.get(purgedTimeBucketIdx).getBucketId() == purgedTimeBucket) {
+ bucketsForTeardown.add(buckets.get(purgedTimeBucketIdx));
+ buckets.remove(purgedTimeBucketIdx);
}
}
@@ -121,14 +122,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
}
@Override
- protected void handleBucketConflict(int bucketIdx, long newBucketId)
+ protected void handleBucketConflict(long bucketId, long newBucketId)
{
- Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value"
+ Preconditions.checkArgument(buckets.get(bucketId).getBucketId() < newBucketId, "new time bucket should have a value"
+ " greater than the old time bucket");
//Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts.
- bucketsForTeardown.add(buckets[bucketIdx]);
- buckets[bucketIdx] = newBucket(newBucketId);
- buckets[bucketIdx].setup(this);
+ bucketsForTeardown.add(buckets.get(bucketId));
+ buckets.put(bucketId, newBucket(newBucketId));
+ buckets.get(bucketId).setup(this);
}
@Override
@@ -138,6 +139,17 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
super.purgeTimeBucketsLessThanEqualTo(timeBucket);
}
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ // set UnBoundedTimeBucketAssigner to this managed state impl
+ if (timeBucketAssigner == null) {
+ UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner();
+ setTimeBucketAssigner(unboundedTimeBucketAssigner);
+ }
+ super.setup(context);
+ }
+
/**
* This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because
* multiple partitions may work on same time-buckets.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
new file mode 100644
index 0000000..ece7686
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * Keeps track of time buckets and triggers purging of obsolete time-buckets that moved out of boundary.<br/>
+ *
+ * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
+ * which time-bucket an event falls into and sliding the time boundaries.
+ * <p/>
+ *
+ * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
+ * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
+ * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
+ * <code>rererenceInstant = currentTime</code>, then <code>
+ * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
+ *
+ * These properties once configured shouldn't be changed because that will result in different time-buckets
+ * for the same (key,time) pair after a failure.
+ * <p/>
+ *
+ * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start
+ * is considered expired. The boundaries slide by {@link #getTimeBucket(long)}. The time which is passed as an
+ * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
+ *
+ */
+public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
+{
+ private long start;
+
+ private long end;
+
+ @NotNull
+ private Instant referenceInstant = new Instant();
+
+ @NotNull
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration expireBefore = Duration.standardDays(2);
+
+ private long bucketSpanMillis;
+
+ private int numBuckets;
+ private transient long fixedStart;
+ private transient boolean triggerPurge;
+ private transient long lowestPurgeableTimeBucket;
+
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ super.setup(managedStateContext);
+ fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
+
+ if (!isInitialized()) {
+
+ start = fixedStart;
+ bucketSpanMillis = getBucketSpan().getMillis();
+ numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
+ end = start + (numBuckets * bucketSpanMillis);
+
+ setInitialized(true);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (triggerPurge && getPurgeListener() != null) {
+ triggerPurge = false;
+ getPurgeListener().purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ /**
+ * Get the bucket key for the long value and adjust boundaries if necessary.
+ *
+ * @param time value from which bucket key is derived.
+ * @return -1 if value is already expired; bucket key otherwise.
+ */
+ @Override
+ public long getTimeBucket(long time)
+ {
+ if (time < start) {
+ return -1;
+ }
+ long diffFromStart = time - fixedStart;
+ long key = diffFromStart / bucketSpanMillis;
+ if (time >= end) {
+ long diffInBuckets = (time - end) / bucketSpanMillis;
+ long move = (diffInBuckets + 1) * bucketSpanMillis;
+ start += move;
+ end += move;
+ triggerPurge = true;
+ lowestPurgeableTimeBucket += diffInBuckets;
+ }
+ return key;
+
+ }
+
+ /**
+ * @return number of buckets.
+ */
+ @Override
+ public long getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * @return reference instant
+ */
+ public Instant getReferenceInstant()
+ {
+ return referenceInstant;
+ }
+
+ /**
+ * Sets the reference instant (by default the system time when the streaming app is created).
+ * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
+ *
+ * @param referenceInstant
+ */
+ public void setReferenceInstant(Instant referenceInstant)
+ {
+ this.referenceInstant = referenceInstant;
+ }
+
+ /**
+ * @return duration before which the data is expired.
+ */
+ public Duration getExpireBefore()
+ {
+ return expireBefore;
+ }
+
+ /**
+ * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
+ * @param expireBefore duration
+ */
+ public void setExpireBefore(Duration expireBefore)
+ {
+ this.expireBefore = expireBefore;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 225439f..7cab41c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -87,7 +87,7 @@ class StateTracker extends TimerTask
//freeing of state needs to be stopped during commit as commit results in transferring data to a state which
// can be freed up as well.
long bytesSum = 0;
- for (Bucket bucket : managedStateImpl.buckets) {
+ for (Bucket bucket : managedStateImpl.buckets.values()) {
if (bucket != null) {
bytesSum += bucket.getSizeInBytes();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
index d218b37..2cae914 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -21,7 +21,8 @@ package org.apache.apex.malhar.lib.state.managed;
import javax.validation.constraints.NotNull;
import org.joda.time.Duration;
-import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -30,180 +31,79 @@ import com.google.common.base.Preconditions;
import com.datatorrent.api.Context;
/**
- * Keeps track of time buckets and triggers purging of obsolete time-buckets.<br/>
- *
- * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
- * which time-bucket an event falls into and sliding the time boundaries.
- * <p/>
- *
- * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
- * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
- * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
- * <code>rererenceInstant = currentTime</code>, then <code>
- * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
- *
- * These properties once configured shouldn't be changed because that will result in different time-buckets
- * for the same (key,time) pair after a failure.
- * <p/>
- *
- * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start
- * is considered expired. The boundaries slide by {@link #getTimeBucketAndAdjustBoundaries(long)}. The time which is passed as an
- * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event
- * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
- *
- * @since 3.4.0
+ * Abstract class to extract a bucket for a given time
*/
-public class TimeBucketAssigner implements ManagedStateComponent
+public abstract class TimeBucketAssigner implements ManagedStateComponent, WindowListener
{
- @NotNull
- private Instant referenceInstant = new Instant();
+ private transient PurgeListener purgeListener;
- @NotNull
- @FieldSerializer.Bind(JavaSerializer.class)
- private Duration expireBefore = Duration.standardDays(2);
+ private boolean initialized;
@FieldSerializer.Bind(JavaSerializer.class)
private Duration bucketSpan;
- private long bucketSpanMillis;
-
- private long start;
- private long end;
- private int numBuckets;
- private transient long fixedStart;
- private transient boolean triggerPurge;
- private transient long lowestPurgeableTimeBucket;
-
- private boolean initialized;
-
- private transient PurgeListener purgeListener;
-
@Override
public void setup(@NotNull ManagedStateContext managedStateContext)
{
Context.OperatorContext context = managedStateContext.getOperatorContext();
- fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
-
- if (!initialized) {
- if (bucketSpan == null) {
- bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
- context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS));
- }
- start = fixedStart;
- bucketSpanMillis = bucketSpan.getMillis();
- numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
- end = start + (numBuckets * bucketSpanMillis);
-
- initialized = true;
- }
- }
-
- public void endWindow()
- {
- if (triggerPurge && purgeListener != null) {
- triggerPurge = false;
- purgeListener.purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket);
- }
- }
-
- @Override
- public void teardown()
- {
- }
-
- /**
- * Get the bucket key for the long value and adjust boundaries if necessary.
- *
- * @param value value from which bucket key is derived.
- * @return -1 if value is already expired; bucket key otherwise.
- */
- public long getTimeBucketAndAdjustBoundaries(long value)
- {
- if (value < start) {
- return -1;
- }
- long diffFromStart = value - fixedStart;
- long key = diffFromStart / bucketSpanMillis;
- if (value >= end) {
- long diffInBuckets = (value - end) / bucketSpanMillis;
- long move = (diffInBuckets + 1) * bucketSpanMillis;
- start += move;
- end += move;
- triggerPurge = true;
- lowestPurgeableTimeBucket += diffInBuckets;
+ if (!initialized && bucketSpan == null) {
+ setBucketSpan(Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
}
- return key;
-
}
/**
- * Sets the purge listener.
- * @param purgeListener purge listener
+ * Get the time bucket for any given time
+ * @param time
+ * @return
*/
- public void setPurgeListener(@NotNull PurgeListener purgeListener)
- {
- this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
- }
+ public abstract long getTimeBucket(long time);
/**
- * @return number of buckets.
+ * Get possible number of buckets
+ * @return
*/
- public int getNumBuckets()
- {
- return numBuckets;
- }
+ public abstract long getNumBuckets();
/**
- * @return reference instant
+ * @return time-bucket span
*/
- public Instant getReferenceInstant()
+ public Duration getBucketSpan()
{
- return referenceInstant;
+ return bucketSpan;
}
/**
- * Sets the reference instant (by default the system time when the streaming app is created).
- * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
- *
- * @param referenceInstant
+ * Sets the length of a time bucket.
+ * @param bucketSpan length of time bucket
*/
- public void setReferenceInstant(Instant referenceInstant)
+ public void setBucketSpan(Duration bucketSpan)
{
- this.referenceInstant = referenceInstant;
+ this.bucketSpan = bucketSpan;
}
- /**
- * @return duration before which the data is expired.
- */
- public Duration getExpireBefore()
+ public boolean isInitialized()
{
- return expireBefore;
+ return initialized;
}
- /**
- * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
- * @param expireBefore duration
- */
- public void setExpireBefore(Duration expireBefore)
+ public void setInitialized(boolean initialized)
{
- this.expireBefore = expireBefore;
+ this.initialized = initialized;
}
/**
- * @return time-bucket span
+ * Sets the purge listener.
+ * @param purgeListener purge listener
*/
- public Duration getBucketSpan()
+ public void setPurgeListener(@NotNull PurgeListener purgeListener)
{
- return bucketSpan;
+ this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
}
- /**
- * Sets the length of a time bucket.
- * @param bucketSpan length of time bucket
- */
- public void setBucketSpan(Duration bucketSpan)
+ public PurgeListener getPurgeListener()
{
- this.bucketSpan = bucketSpan;
+ return purgeListener;
}
/**
@@ -214,5 +114,4 @@ public class TimeBucketAssigner implements ManagedStateComponent
{
void purgeTimeBucketsLessThanEqualTo(long timeBucket);
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
index 5d706db..5a2fa36 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
@@ -26,4 +26,27 @@ package org.apache.apex.malhar.lib.state.managed;
public interface TimeExtractor<T>
{
long getTime(T t);
+
+ class FixedTimeExtractor<V> implements TimeExtractor<V>
+ {
+
+ private long fixedTime;
+
+ public FixedTimeExtractor(long fixedTime)
+ {
+ this.fixedTime = fixedTime;
+ }
+
+ private FixedTimeExtractor()
+ {
+ // For kryo
+ }
+
+ @Override
+ public long getTime(V v)
+ {
+ return fixedTime;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java
new file mode 100644
index 0000000..2027249
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Simple Time bucket assigner to assiger time bucket for any given time <br>
+ * The algorithm is simple to just round the time to time bucket span. <br>
+ * Ex. given time bucket span is 1000 milliseconds <br>
+ * All times 1001, 1002 ... 1999 will be assigned to time bucket 1000 <br>
+ *
+ *
+ */
+public class UnboundedTimeBucketAssigner extends TimeBucketAssigner
+{
+ @Override
+ public long getTimeBucket(long time)
+ {
+
+ Preconditions.checkArgument(time >= 0, "Time: %s is illegal", time);
+ return time - time % getBucketSpan().getMillis();
+ }
+
+ @Override
+ public long getNumBuckets()
+ {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ super.setup(managedStateContext);
+ setInitialized(true);
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 24cc8b2..cc976f1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -57,29 +57,6 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
Spillable.SpillableComponent
{
- private static class FixedTimeExtractor<V> implements TimeExtractor<V>
- {
-
- private long fixedTime;
-
- private FixedTimeExtractor(long fixedTime)
- {
- this.fixedTime = fixedTime;
- }
-
- private FixedTimeExtractor()
- {
- // For kryo
- }
-
- @Override
- public long getTime(V v)
- {
- return fixedTime;
- }
-
- }
-
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
@@ -176,7 +153,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
if (timeExtractor != null) {
- spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new FixedTimeExtractor(keyTime));
+ spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new TimeExtractor.FixedTimeExtractor(keyTime));
} else {
spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
}
@@ -288,7 +265,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
if (timeExtractor == null) {
spillableSet = new SpillableSetImpl<>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
} else {
- spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new FixedTimeExtractor(timeExtractor.getTime(key)));
+ spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new TimeExtractor.FixedTimeExtractor(timeExtractor.getTime(key)));
}
spillableSet.setup(context);
cache.put(key, spillableSet);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index ef06eea..6a16e62 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
@@ -54,10 +55,13 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye
if (keyToWindowsMap == null) {
// NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on.
// This is logged in APEXMALHAR-2271
- keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde);
+ // A work around to make session window data never expire and all kept in one time bucket
+ keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde, new TimeExtractor.FixedTimeExtractor(Long.MAX_VALUE));
}
}
+
+
@Override
@SuppressWarnings("unchecked")
public void remove(Window window)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 82428fb..1d2334d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -32,7 +32,6 @@ import org.junit.runner.Description;
import com.datatorrent.api.Context;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
-import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.netlet.util.Slice;
@@ -66,13 +65,6 @@ public class ManagedTimeUnifiedStateImplTest
public TestMeta testMeta = new TestMeta();
@Test
- public void testSerde() throws IOException
- {
- ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
- Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
- }
-
- @Test
public void testSimplePutGet()
{
Slice one = ManagedStateTestUtils.getSliceFor("1");
@@ -129,7 +121,7 @@ public class ManagedTimeUnifiedStateImplTest
testMeta.managedState.setup(testMeta.operatorContext);
- long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucket(time);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
//write data to disk explicitly
@@ -151,7 +143,7 @@ public class ManagedTimeUnifiedStateImplTest
testMeta.managedState.setup(testMeta.operatorContext);
- long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
+ long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucket(time);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
//write data to disk explicitly
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
index 8ae4db7..ef67b94 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
@@ -31,7 +31,7 @@ class MockManagedStateContext implements ManagedStateContext
private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl();
private Comparator<Slice> keyComparator = new SliceComparator();
private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
- private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+ private MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
private final Context.OperatorContext operatorContext;
@@ -58,7 +58,7 @@ class MockManagedStateContext implements ManagedStateContext
}
@Override
- public TimeBucketAssigner getTimeBucketAssigner()
+ public MovingBoundaryTimeBucketAssigner getTimeBucketAssigner()
{
return timeBucketAssigner;
}
@@ -84,7 +84,7 @@ class MockManagedStateContext implements ManagedStateContext
this.bucketsFileSystem = bucketsFileSystem;
}
- void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner)
+ void setTimeBucketAssigner(MovingBoundaryTimeBucketAssigner timeBucketAssigner)
{
this.timeBucketAssigner = timeBucketAssigner;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
new file mode 100644
index 0000000..e4e5d2e
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class MovingBoundaryTimeBucketAssignerTest
+{
+
+ class TestMeta extends TestWatcher
+ {
+ MovingBoundaryTimeBucketAssigner timeBucketAssigner;
+ MockManagedStateContext mockManagedStateContext;
+
+ @Override
+ protected void starting(Description description)
+ {
+ timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
+ mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ MovingBoundaryTimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner);
+ Assert.assertNotNull("time bucket assigner", deserialized);
+ }
+
+ @Test
+ public void testNumBuckets()
+ {
+ testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
+ testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
+
+ testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+ Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets());
+ testMeta.timeBucketAssigner.teardown();
+ }
+
+ @Test
+ public void testTimeBucketKey()
+ {
+ testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
+ testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
+
+ long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
+ testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+ long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
+ Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time1));
+
+ long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
+ Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucket(time0));
+
+ long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
+ Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(expiredTime));
+ testMeta.timeBucketAssigner.teardown();
+ }
+
+ @Test
+ public void testTimeBucketKeyExpiry()
+ {
+ testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
+ testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+
+ long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
+ testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+ long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
+ Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+
+ long time2 = Duration.standardSeconds(10).getMillis() + referenceTime;
+ Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
+
+ //Check for expiry of time1 now
+ Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+
+ testMeta.timeBucketAssigner.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
deleted file mode 100644
index 8ca0960..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.lib.state.managed;
-
-import java.io.IOException;
-
-import org.joda.time.Duration;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class TimeBucketAssignerTest
-{
-
- class TestMeta extends TestWatcher
- {
- TimeBucketAssigner timeBucketAssigner;
- MockManagedStateContext mockManagedStateContext;
-
- @Override
- protected void starting(Description description)
- {
- timeBucketAssigner = new TimeBucketAssigner();
- mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
- }
-
- @Override
- protected void finished(Description description)
- {
- }
- }
-
- @Rule
- public TestMeta testMeta = new TestMeta();
-
- @Test
- public void testSerde() throws IOException
- {
- TimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner);
- Assert.assertNotNull("time bucket assigner", deserialized);
- }
-
- @Test
- public void testNumBuckets()
- {
- testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
- testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
-
- testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-
- Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets());
- testMeta.timeBucketAssigner.teardown();
- }
-
- @Test
- public void testTimeBucketKey()
- {
- testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
- testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
-
- long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
- testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-
- long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
- Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1));
-
- long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
- Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time0));
-
- long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
- Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime));
- testMeta.timeBucketAssigner.teardown();
- }
-
- @Test
- public void testTimeBucketKeyExpiry()
- {
- testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
- testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
-
- long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
- testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-
- long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
- Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) );
-
- long time2 = Duration.standardSeconds(10).getMillis() + referenceTime;
- Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time2) );
-
- //Check for expiry of time1 now
- Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) );
-
- testMeta.timeBucketAssigner.teardown();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index f898e2d..512626e 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -58,7 +58,8 @@ import com.datatorrent.lib.util.KeyValPair;
public class WindowedOperatorTest
{
- public static final long BASE = (System.currentTimeMillis() / 1000) * 1000;
+ // To test the extreme condition counting from
+ public static final long BASE = ((System.currentTimeMillis() - 3600000L * 24 * 365) / 1000) * 1000;
@Parameterized.Parameters
public static Collection<Object[]> testParameters()