You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/05/17 20:49:05 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2073 fixed the race condition that caused the first key in tbm to be null

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 25857e391 -> 4b126aa52


APEXMALHAR-2073 fixed the race condition that caused the first key in tbm to be null


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ca9fed97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ca9fed97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ca9fed97

Branch: refs/heads/master
Commit: ca9fed971f7ff26643ce2417c750b951fc161752
Parents: f94c2d8
Author: Chandni Singh <cs...@apache.org>
Authored: Mon May 16 17:30:51 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon May 16 21:52:26 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/state/managed/Bucket.java   | 12 ++-------
 .../lib/state/managed/BucketsFileSystem.java    | 26 ++++++++++++++------
 .../managed/IncrementalCheckpointManager.java   |  8 +++++-
 .../malhar/lib/state/managed/StateTracker.java  |  2 +-
 .../state/managed/BucketsFileSystemTest.java    | 13 +++++++---
 .../lib/state/managed/ManagedStateImplTest.java |  1 -
 6 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 107bfc6..352f121 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -285,7 +285,6 @@ public interface Bucket extends ManagedStateComponent
         } else {
           //search all the time buckets
           for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
-
             if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
               //keys in the time bucket files are sorted so if the first key in the file is greater than the key being
               //searched, the key will not be present in that file.
@@ -400,16 +399,8 @@ public interface Bucket extends ManagedStateComponent
     @Override
     public long freeMemory(long windowId) throws IOException
     {
-      LOG.debug("free space {}", bucketId);
       long memoryFreed = 0;
-
-      for (Map.Entry<Long, Map<Slice, BucketedValue>> windowEntry : committedData.entrySet()) {
-        for (Map.Entry<Slice, BucketedValue> entry: windowEntry.getValue().entrySet()) {
-          memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
-        }
-      }
-
-      Long clearWindowId = null;
+      Long clearWindowId;
 
       while ((clearWindowId = committedData.floorKey(windowId)) != null) {
         Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
@@ -432,6 +423,7 @@ public interface Bucket extends ManagedStateComponent
 
       }
       sizeInBytes.getAndAdd(-memoryFreed);
+      LOG.debug("space freed {} {}", bucketId, memoryFreed);
       return memoryFreed;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
index 8304fb6..483d9e8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -150,7 +150,11 @@ public class BucketsFileSystem implements ManagedStateComponent
     }
 
     for (long timeBucket : timeBucketedKeys.rowKeySet()) {
-      BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId, timeBucket);
+      BucketsFileSystem.MutableTimeBucketMeta tbm = getMutableTimeBucketMeta(bucketId, timeBucket);
+      if (tbm == null) {
+        tbm = new MutableTimeBucketMeta(bucketId, timeBucket);
+      }
+
       addBucketName(bucketId);
 
       long dataSize = 0;
@@ -202,6 +206,7 @@ public class BucketsFileSystem implements ManagedStateComponent
       fileWriter.close();
       rename(bucketId, tmpFileName, getFileName(timeBucket));
       tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
+      updateTimeBuckets(tbm);
     }
 
     updateBucketMetaFile(bucketId);
@@ -217,15 +222,19 @@ public class BucketsFileSystem implements ManagedStateComponent
    * @throws IOException
    */
   @NotNull
-  MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+  private MutableTimeBucketMeta getMutableTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
   {
     synchronized (timeBucketsMeta) {
-      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
-      if (tbm == null) {
-        tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
-        timeBucketsMeta.put(bucketId, timeBucketId, tbm);
-      }
-      return tbm;
+      return timeBucketMetaHelper(bucketId, timeBucketId);
+    }
+  }
+
+  void updateTimeBuckets(@NotNull MutableTimeBucketMeta mutableTimeBucketMeta)
+  {
+    Preconditions.checkNotNull(mutableTimeBucketMeta, "mutable time bucket meta");
+    synchronized (timeBucketsMeta) {
+      timeBucketsMeta.put(mutableTimeBucketMeta.getBucketId(), mutableTimeBucketMeta.getTimeBucketId(),
+          mutableTimeBucketMeta);
     }
   }
 
@@ -320,6 +329,7 @@ public class BucketsFileSystem implements ManagedStateComponent
    */
   private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException
   {
+    LOG.debug("Loading bucket meta-file {}", bucketId);
     int metaDataVersion = dis.readInt();
 
     if (metaDataVersion == META_FILE_VERSION) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 02fd6ec..536702d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -71,6 +71,8 @@ public class IncrementalCheckpointManager extends WindowDataManager.FSWindowData
   private transient int waitMillis;
   private volatile long lastTransferredWindow = Stateless.WINDOW_ID;
 
+  private transient long largestWindowAddedToTransferQueue = Stateless.WINDOW_ID;
+
   public IncrementalCheckpointManager()
   {
     super();
@@ -188,8 +190,12 @@ public class IncrementalCheckpointManager extends WindowDataManager.FSWindowData
   {
     LOG.debug("data manager committed {}", windowId);
     for (Long currentWindow : savedWindows.keySet()) {
+      if (currentWindow <= largestWindowAddedToTransferQueue) {
+        continue;
+      }
       if (currentWindow <= windowId) {
-        LOG.debug("to transfer {}", windowId);
+        LOG.debug("to transfer {}", currentWindow);
+        largestWindowAddedToTransferQueue = currentWindow;
         windowsToTransfer.add(currentWindow);
       } else {
         break;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 3bb5507..4813c25 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -123,7 +123,7 @@ class StateTracker extends TimerTask
               long sizeFreed;
               try {
                 sizeFreed = bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow());
-                LOG.debug("size freed {} {}", bucketId, sizeFreed);
+                LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
               } catch (IOException e) {
                 managedStateImpl.throwable.set(e);
                 throw new RuntimeException("freeing " + bucketId, e);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
index ee6cf00..1696d4d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -97,10 +97,12 @@ public class BucketsFileSystemTest
   public void testUpdateBucketMetaDataFile() throws IOException
   {
     testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
-    BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    BucketsFileSystem.MutableTimeBucketMeta mutableTbm = new BucketsFileSystem.MutableTimeBucketMeta(1, 1);
     mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
 
+    testMeta.bucketsFileSystem.updateTimeBuckets(mutableTbm);
     testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+
     BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
     Assert.assertNotNull(immutableTbm);
     Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId());
@@ -116,7 +118,8 @@ public class BucketsFileSystemTest
     BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
     Assert.assertNull("bucket meta", bucketMeta);
 
-    testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    BucketsFileSystem.MutableTimeBucketMeta mutableTimeBucketMeta = new BucketsFileSystem.MutableTimeBucketMeta(1, 1);
+    testMeta.bucketsFileSystem.updateTimeBuckets(mutableTimeBucketMeta);
     bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
     Assert.assertNotNull("bucket meta not null", bucketMeta);
     testMeta.bucketsFileSystem.teardown();
@@ -126,11 +129,13 @@ public class BucketsFileSystemTest
   public void testGetAllTimeBucketMeta() throws IOException
   {
     testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
-    BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    BucketsFileSystem.MutableTimeBucketMeta tbm1 = new BucketsFileSystem.MutableTimeBucketMeta(1, 1);
     tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+    testMeta.bucketsFileSystem.updateTimeBuckets(tbm1);
 
-    BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2);
+    BucketsFileSystem.MutableTimeBucketMeta tbm2 = new BucketsFileSystem.MutableTimeBucketMeta(1, 2);
     tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes()));
+    testMeta.bucketsFileSystem.updateTimeBuckets(tbm2);
 
     testMeta.bucketsFileSystem.updateBucketMetaFile(1);
     TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
index cdd9781..99e6c23 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -174,7 +174,6 @@ public class ManagedStateImplTest
   {
     testMeta.managedState.setMaxMemorySize(1);
     testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L));
-    testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L));
     testMeta.managedState.setup(testMeta.operatorContext);
 
     int numKeys = 300;


[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2073'

Posted by ti...@apache.org.
Merge branch 'APEXMALHAR-2073'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4b126aa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4b126aa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4b126aa5

Branch: refs/heads/master
Commit: 4b126aa527351825a0b47a8378611d3b8ef6a12d
Parents: 25857e3 ca9fed9
Author: ilooner <ti...@gmail.com>
Authored: Tue May 17 13:48:31 2016 -0700
Committer: ilooner <ti...@gmail.com>
Committed: Tue May 17 13:48:31 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/state/managed/Bucket.java   | 12 ++-------
 .../lib/state/managed/BucketsFileSystem.java    | 26 ++++++++++++++------
 .../managed/IncrementalCheckpointManager.java   |  8 +++++-
 .../malhar/lib/state/managed/StateTracker.java  |  2 +-
 .../state/managed/BucketsFileSystemTest.java    | 13 +++++++---
 .../lib/state/managed/ManagedStateImplTest.java |  1 -
 6 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------