You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/08/12 05:07:44 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2129 removed moving time boundaries periodically and asynchrnously based on system time

Repository: apex-malhar
Updated Branches:
  refs/heads/master 26fa9d781 -> d1fb2b604


APEXMALHAR-2129 removed moving time boundaries periodically and asynchrnously based on system time


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d797bdd1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d797bdd1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d797bdd1

Branch: refs/heads/master
Commit: d797bdd12ca710c209cb9980119e54af2adcae57
Parents: 37cb584
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Aug 10 14:09:30 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Aug 11 17:28:22 2016 -0700

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |  1 -
 .../lib/state/managed/ManagedStateImpl.java     |  2 +-
 .../lib/state/managed/ManagedTimeStateImpl.java |  6 +-
 .../managed/ManagedTimeUnifiedStateImpl.java    |  6 +-
 .../lib/state/managed/TimeBucketAssigner.java   | 96 +++++++-------------
 .../ManagedTimeUnifiedStateImplTest.java        |  4 +-
 .../state/managed/TimeBucketAssignerTest.java   | 34 +------
 7 files changed, 47 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index b5b9f8c..927a6df 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -245,7 +245,6 @@ public abstract class AbstractManagedStateImpl
     if (throwable.get() != null) {
       Throwables.propagate(throwable.get());
     }
-    timeBucketAssigner.beginWindow(windowId);
   }
 
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
index b4453d5..ba8cdc6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -55,7 +55,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket
   @Override
   public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     putInBucket(bucketId, timeBucket, key, value);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
index b441183..eddc736 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -48,7 +48,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
   @Override
   public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     putInBucket(bucketId, timeBucket, key, value);
   }
 
@@ -61,7 +61,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
   @Override
   public Slice getSync(long bucketId, long time, @NotNull Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     if (timeBucket == -1) {
       //time is expired so no point in looking further.
       return BucketedState.EXPIRED;
@@ -78,7 +78,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
   @Override
   public Future<Slice> getAsync(long bucketId, long time, Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     if (timeBucket == -1) {
       //time is expired so no point in looking further.
       return Futures.immediateFuture(BucketedState.EXPIRED);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index 487f89c..e04286e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -67,14 +67,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   @Override
   public void put(long time, @NotNull Slice key, @NotNull Slice value)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     putInBucket(timeBucket, timeBucket, key, value);
   }
 
   @Override
   public Slice getSync(long time, @NotNull Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     if (timeBucket == -1) {
       //time is expired so return expired slice.
       return BucketedState.EXPIRED;
@@ -85,7 +85,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   @Override
   public Future<Slice> getAsync(long time, @NotNull Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
     if (timeBucket == -1) {
       //time is expired so return expired slice.
       return Futures.immediateFuture(BucketedState.EXPIRED);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
index a60bc72..435ffe2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -28,10 +28,9 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Preconditions;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.lib.appdata.query.WindowBoundedService;
 
 /**
- * Keeps track of time buckets.<br/>
+ * Keeps track of time buckets and triggers purging of obsolete time-buckets.<br/>
  *
  * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
  * which time-bucket an event falls into and sliding the time boundaries.
@@ -40,23 +39,16 @@ import com.datatorrent.lib.appdata.query.WindowBoundedService;
  * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
  * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
  * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
- * <code>rererenceInstant = current-time</code>, then <code>
+ * <code>rererenceInstant = currentTime</code>, then <code>
  *   numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
  *
  * These properties once configured shouldn't be changed because that will result in different time-buckets
  * for the same (key,time) pair after a failure.
  * <p/>
  *
- * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start
- * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/>
- * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen
- * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may
- * happen that old value of 'start' is saved with the new value of 'end'.
- *
- * <p/>
- *
- * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this
- * method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start
+ * is considered expired. The boundaries slide by {@link #getTimeBucketAndAdjustBoundaries(long)}. The time which is passed as an
+ * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event
  * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
  *
  * @since 3.4.0
@@ -79,30 +71,12 @@ public class TimeBucketAssigner implements ManagedStateComponent
   private long end;
   private int numBuckets;
   private transient long fixedStart;
-  private transient long lowestTimeBucket;
+  private transient boolean triggerPurge;
+  private transient long lowestPurgeableTimeBucket;
 
   private boolean initialized;
 
-  private transient WindowBoundedService windowBoundedService;
-
-  private transient PurgeListener purgeListener = null;
-
-  private final transient Runnable expiryTask = new Runnable()
-  {
-    @Override
-    public void run()
-    {
-      synchronized (lock) {
-        start += bucketSpanMillis;
-        end += bucketSpanMillis;
-        if (purgeListener != null) {
-          purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++);
-        }
-      }
-    }
-  };
-
-  private final transient Object lock = new Object();
+  private transient PurgeListener purgeListener;
 
   @Override
   public void setup(@NotNull ManagedStateContext managedStateContext)
@@ -122,55 +96,55 @@ public class TimeBucketAssigner implements ManagedStateComponent
 
       initialized = true;
     }
-    lowestTimeBucket = (start - fixedStart) / bucketSpanMillis;
-    windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask);
-    windowBoundedService.setup(context);
   }
 
-  public void beginWindow(long windowId)
+  public void endWindow()
   {
-    windowBoundedService.beginWindow(windowId);
+    if (triggerPurge && purgeListener != null) {
+      triggerPurge = false;
+      purgeListener.purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket);
+    }
   }
 
-  public void endWindow()
+  @Override
+  public void teardown()
   {
-    windowBoundedService.endWindow();
   }
 
   /**
-   * Get the bucket key for the long value.
+   * Get the bucket key for the long value and adjust boundaries if necessary.
    *
    * @param value value from which bucket key is derived.
    * @return -1 if value is already expired; bucket key otherwise.
    */
-  public long getTimeBucketFor(long value)
+  public long getTimeBucketAndAdjustBoundaries(long value)
   {
-    synchronized (lock) {
-      if (value < start) {
-        return -1;
-      }
-      long diffFromStart = value - fixedStart;
-      long key = diffFromStart / bucketSpanMillis;
-      if (value > end) {
-        long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis;
-        start += move;
-        end += move;
-      }
-      return key;
+    if (value < start) {
+      return -1;
     }
+    long diffFromStart = value - fixedStart;
+    long key = diffFromStart / bucketSpanMillis;
+    if (value > end) {
+      long diffInBuckets = (value - end) / bucketSpanMillis;
+      long move = (diffInBuckets + 1) * bucketSpanMillis;
+      start += move;
+      end += move;
+      triggerPurge = true;
+      lowestPurgeableTimeBucket += diffInBuckets;
+    }
+    return key;
+
   }
 
+  /**
+   * Sets the purge listener.
+   * @param purgeListener purge listener
+   */
   public void setPurgeListener(@NotNull PurgeListener purgeListener)
   {
     this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
   }
 
-  @Override
-  public void teardown()
-  {
-    windowBoundedService.teardown();
-  }
-
   /**
    * @return number of buckets.
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index c7920de..6808b63 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -110,7 +110,7 @@ public class ManagedTimeUnifiedStateImplTest
 
     testMeta.managedState.setup(testMeta.operatorContext);
 
-    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
 
     //write data to disk explicitly
@@ -132,7 +132,7 @@ public class ManagedTimeUnifiedStateImplTest
 
     testMeta.managedState.setup(testMeta.operatorContext);
 
-    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
 
     //write data to disk explicitly

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
index 952b4f6..4ceef1f 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
@@ -20,8 +20,6 @@
 package org.apache.apex.malhar.lib.state.managed;
 
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.joda.time.Duration;
 import org.junit.Assert;
@@ -85,39 +83,13 @@ public class TimeBucketAssignerTest
     testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
 
     long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
-    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketFor(time1));
+    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1));
 
     long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
-    Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketFor(time0));
+    Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time0));
 
     long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
-    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketFor(expiredTime));
+    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime));
     testMeta.timeBucketAssigner.teardown();
   }
-
-  @Test
-  public void testSlidingOnlyBetweenWindow() throws InterruptedException
-  {
-    final CountDownLatch latch = new CountDownLatch(1);
-    final AtomicInteger timesCalled = new AtomicInteger();
-    testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
-    {
-      @Override
-      public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
-      {
-        timesCalled.getAndIncrement();
-        latch.countDown();
-      }
-    });
-
-    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-    testMeta.timeBucketAssigner.beginWindow(0);
-    latch.await();
-    testMeta.timeBucketAssigner.endWindow();
-    int valueBeforeSleep = timesCalled.get();
-    Thread.sleep(1000);
-    Assert.assertEquals("value should not change", valueBeforeSleep, timesCalled.get());
-    testMeta.timeBucketAssigner.teardown();
-  }
-
 }


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2129' of https://github.com/chandnisingh/apex-malhar

Posted by bh...@apache.org.
Merge branch 'APEXMALHAR-2129' of https://github.com/chandnisingh/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d1fb2b60
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d1fb2b60
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d1fb2b60

Branch: refs/heads/master
Commit: d1fb2b604bd9c8e3317e4e2a73124dd41a7e03b3
Parents: 26fa9d7 d797bdd
Author: bhupeshchawda <bh...@gmail.com>
Authored: Fri Aug 12 10:15:13 2016 +0530
Committer: bhupeshchawda <bh...@gmail.com>
Committed: Fri Aug 12 10:15:13 2016 +0530

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |  1 -
 .../lib/state/managed/ManagedStateImpl.java     |  2 +-
 .../lib/state/managed/ManagedTimeStateImpl.java |  6 +-
 .../managed/ManagedTimeUnifiedStateImpl.java    |  6 +-
 .../lib/state/managed/TimeBucketAssigner.java   | 96 +++++++-------------
 .../ManagedTimeUnifiedStateImplTest.java        |  4 +-
 .../state/managed/TimeBucketAssignerTest.java   | 34 +------
 7 files changed, 47 insertions(+), 102 deletions(-)
----------------------------------------------------------------------