You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/08/12 05:07:44 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2129 removed moving time
boundaries periodically and asynchrnously based on system time
Repository: apex-malhar
Updated Branches:
refs/heads/master 26fa9d781 -> d1fb2b604
APEXMALHAR-2129 removed moving time boundaries periodically and asynchrnously based on system time
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d797bdd1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d797bdd1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d797bdd1
Branch: refs/heads/master
Commit: d797bdd12ca710c209cb9980119e54af2adcae57
Parents: 37cb584
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Aug 10 14:09:30 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Aug 11 17:28:22 2016 -0700
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 1 -
.../lib/state/managed/ManagedStateImpl.java | 2 +-
.../lib/state/managed/ManagedTimeStateImpl.java | 6 +-
.../managed/ManagedTimeUnifiedStateImpl.java | 6 +-
.../lib/state/managed/TimeBucketAssigner.java | 96 +++++++-------------
.../ManagedTimeUnifiedStateImplTest.java | 4 +-
.../state/managed/TimeBucketAssignerTest.java | 34 +------
7 files changed, 47 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index b5b9f8c..927a6df 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -245,7 +245,6 @@ public abstract class AbstractManagedStateImpl
if (throwable.get() != null) {
Throwables.propagate(throwable.get());
}
- timeBucketAssigner.beginWindow(windowId);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
index b4453d5..ba8cdc6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -55,7 +55,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket
@Override
public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
putInBucket(bucketId, timeBucket, key, value);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
index b441183..eddc736 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -48,7 +48,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Override
public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
putInBucket(bucketId, timeBucket, key, value);
}
@@ -61,7 +61,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Override
public Slice getSync(long bucketId, long time, @NotNull Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return BucketedState.EXPIRED;
@@ -78,7 +78,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
@Override
public Future<Slice> getAsync(long bucketId, long time, Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return Futures.immediateFuture(BucketedState.EXPIRED);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index 487f89c..e04286e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -67,14 +67,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
@Override
public void put(long time, @NotNull Slice key, @NotNull Slice value)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
putInBucket(timeBucket, timeBucket, key, value);
}
@Override
public Slice getSync(long time, @NotNull Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
if (timeBucket == -1) {
//time is expired so return expired slice.
return BucketedState.EXPIRED;
@@ -85,7 +85,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
@Override
public Future<Slice> getAsync(long time, @NotNull Slice key)
{
- long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
if (timeBucket == -1) {
//time is expired so return expired slice.
return Futures.immediateFuture(BucketedState.EXPIRED);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
index a60bc72..435ffe2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -28,10 +28,9 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;
import com.datatorrent.api.Context;
-import com.datatorrent.lib.appdata.query.WindowBoundedService;
/**
- * Keeps track of time buckets.<br/>
+ * Keeps track of time buckets and triggers purging of obsolete time-buckets.<br/>
*
* The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
* which time-bucket an event falls into and sliding the time boundaries.
@@ -40,23 +39,16 @@ import com.datatorrent.lib.appdata.query.WindowBoundedService;
* The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
* time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
* For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
- * <code>rererenceInstant = current-time</code>, then <code>
+ * <code>rererenceInstant = currentTime</code>, then <code>
* numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
*
* These properties once configured shouldn't be changed because that will result in different time-buckets
* for the same (key,time) pair after a failure.
* <p/>
*
- * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start
- * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/>
- * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen
- * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may
- * happen that old value of 'start' is saved with the new value of 'end'.
- *
- * <p/>
- *
- * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this
- * method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start
+ * is considered expired. The boundaries slide by {@link #getTimeBucketAndAdjustBoundaries(long)}. The time which is passed as an
+ * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event
* (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
*
* @since 3.4.0
@@ -79,30 +71,12 @@ public class TimeBucketAssigner implements ManagedStateComponent
private long end;
private int numBuckets;
private transient long fixedStart;
- private transient long lowestTimeBucket;
+ private transient boolean triggerPurge;
+ private transient long lowestPurgeableTimeBucket;
private boolean initialized;
- private transient WindowBoundedService windowBoundedService;
-
- private transient PurgeListener purgeListener = null;
-
- private final transient Runnable expiryTask = new Runnable()
- {
- @Override
- public void run()
- {
- synchronized (lock) {
- start += bucketSpanMillis;
- end += bucketSpanMillis;
- if (purgeListener != null) {
- purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++);
- }
- }
- }
- };
-
- private final transient Object lock = new Object();
+ private transient PurgeListener purgeListener;
@Override
public void setup(@NotNull ManagedStateContext managedStateContext)
@@ -122,55 +96,55 @@ public class TimeBucketAssigner implements ManagedStateComponent
initialized = true;
}
- lowestTimeBucket = (start - fixedStart) / bucketSpanMillis;
- windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask);
- windowBoundedService.setup(context);
}
- public void beginWindow(long windowId)
+ public void endWindow()
{
- windowBoundedService.beginWindow(windowId);
+ if (triggerPurge && purgeListener != null) {
+ triggerPurge = false;
+ purgeListener.purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket);
+ }
}
- public void endWindow()
+ @Override
+ public void teardown()
{
- windowBoundedService.endWindow();
}
/**
- * Get the bucket key for the long value.
+ * Get the bucket key for the long value and adjust boundaries if necessary.
*
* @param value value from which bucket key is derived.
* @return -1 if value is already expired; bucket key otherwise.
*/
- public long getTimeBucketFor(long value)
+ public long getTimeBucketAndAdjustBoundaries(long value)
{
- synchronized (lock) {
- if (value < start) {
- return -1;
- }
- long diffFromStart = value - fixedStart;
- long key = diffFromStart / bucketSpanMillis;
- if (value > end) {
- long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis;
- start += move;
- end += move;
- }
- return key;
+ if (value < start) {
+ return -1;
}
+ long diffFromStart = value - fixedStart;
+ long key = diffFromStart / bucketSpanMillis;
+ if (value > end) {
+ long diffInBuckets = (value - end) / bucketSpanMillis;
+ long move = (diffInBuckets + 1) * bucketSpanMillis;
+ start += move;
+ end += move;
+ triggerPurge = true;
+ lowestPurgeableTimeBucket += diffInBuckets;
+ }
+ return key;
+
}
+ /**
+ * Sets the purge listener.
+ * @param purgeListener purge listener
+ */
public void setPurgeListener(@NotNull PurgeListener purgeListener)
{
this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
}
- @Override
- public void teardown()
- {
- windowBoundedService.teardown();
- }
-
/**
* @return number of buckets.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index c7920de..6808b63 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -110,7 +110,7 @@ public class ManagedTimeUnifiedStateImplTest
testMeta.managedState.setup(testMeta.operatorContext);
- long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+ long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
//write data to disk explicitly
@@ -132,7 +132,7 @@ public class ManagedTimeUnifiedStateImplTest
testMeta.managedState.setup(testMeta.operatorContext);
- long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+ long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
//write data to disk explicitly
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
index 952b4f6..4ceef1f 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
@@ -20,8 +20,6 @@
package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.Duration;
import org.junit.Assert;
@@ -85,39 +83,13 @@ public class TimeBucketAssignerTest
testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
- Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketFor(time1));
+ Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1));
long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
- Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketFor(time0));
+ Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time0));
long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
- Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketFor(expiredTime));
+ Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime));
testMeta.timeBucketAssigner.teardown();
}
-
- @Test
- public void testSlidingOnlyBetweenWindow() throws InterruptedException
- {
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicInteger timesCalled = new AtomicInteger();
- testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
- {
- @Override
- public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
- {
- timesCalled.getAndIncrement();
- latch.countDown();
- }
- });
-
- testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
- testMeta.timeBucketAssigner.beginWindow(0);
- latch.await();
- testMeta.timeBucketAssigner.endWindow();
- int valueBeforeSleep = timesCalled.get();
- Thread.sleep(1000);
- Assert.assertEquals("value should not change", valueBeforeSleep, timesCalled.get());
- testMeta.timeBucketAssigner.teardown();
- }
-
}
[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2129' of
https://github.com/chandnisingh/apex-malhar
Posted by bh...@apache.org.
Merge branch 'APEXMALHAR-2129' of https://github.com/chandnisingh/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d1fb2b60
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d1fb2b60
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d1fb2b60
Branch: refs/heads/master
Commit: d1fb2b604bd9c8e3317e4e2a73124dd41a7e03b3
Parents: 26fa9d7 d797bdd
Author: bhupeshchawda <bh...@gmail.com>
Authored: Fri Aug 12 10:15:13 2016 +0530
Committer: bhupeshchawda <bh...@gmail.com>
Committed: Fri Aug 12 10:15:13 2016 +0530
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 1 -
.../lib/state/managed/ManagedStateImpl.java | 2 +-
.../lib/state/managed/ManagedTimeStateImpl.java | 6 +-
.../managed/ManagedTimeUnifiedStateImpl.java | 6 +-
.../lib/state/managed/TimeBucketAssigner.java | 96 +++++++-------------
.../ManagedTimeUnifiedStateImplTest.java | 4 +-
.../state/managed/TimeBucketAssignerTest.java | 34 +------
7 files changed, 47 insertions(+), 102 deletions(-)
----------------------------------------------------------------------