You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/11 23:57:59 UTC

apex-malhar git commit: APEXMALHAR-2281 fixed race condition with put and asyncGet

Repository: apex-malhar
Updated Branches:
  refs/heads/master aeb10f33d -> 07812a903


APEXMALHAR-2281 fixed race condition with put and asyncGet


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/07812a90
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/07812a90
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/07812a90

Branch: refs/heads/master
Commit: 07812a90394c92ae7c4b0587edf910f7ed330793
Parents: aeb10f3
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Oct 6 21:35:34 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Oct 11 11:30:26 2016 -0700

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |  9 ++-
 .../apex/malhar/lib/state/managed/Bucket.java   | 77 +++++++++++---------
 .../lib/state/managed/BucketsFileSystem.java    | 13 ++--
 .../managed/ManagedTimeStateMultiValue.java     |  1 -
 .../state/managed/BucketsFileSystemTest.java    | 16 ++--
 .../lib/state/managed/DefaultBucketTest.java    | 12 +--
 .../IncrementalCheckpointManagerTest.java       |  5 +-
 .../state/managed/ManagedStateTestUtils.java    | 13 +++-
 .../state/managed/ManagedTimeStateImplTest.java |  2 +-
 .../ManagedTimeUnifiedStateImplTest.java        |  8 +-
 10 files changed, 88 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 1e378ec..dd2bbab 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -172,8 +172,6 @@ public abstract class AbstractManagedStateImpl
   protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
       Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
 
-  protected Bucket.ReadSource asyncReadSource = Bucket.ReadSource.ALL;
-
   @Override
   public void setup(OperatorContext context)
   {
@@ -274,6 +272,11 @@ public abstract class AbstractManagedStateImpl
     if (timeBucket != -1) {
       //time bucket is invalid data is not stored
       int bucketIdx = prepareBucket(bucketId);
+      //synchronization on a bucket isn't required for put because the event is added to flash which is
+      //a concurrent map. The assumption here is that the calls to put & get(sync/async) are being made synchronously by
+      //a single thread (operator thread). The get(sync/async) always checks memory first synchronously.
+      //If the key is not in the memory, then the async get will uses other reader threads which will fetch it from
+      //the files.
       buckets[bucketIdx].put(key, timeBucket, value);
     }
   }
@@ -566,7 +569,7 @@ public abstract class AbstractManagedStateImpl
         synchronized (bucket) {
           //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
           //involves creating readers for the time buckets and de-serializing key/value from a reader.
-          Slice value = bucket.get(key, timeBucketId, this.managedState.asyncReadSource);
+          Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
           managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
           return value;
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 74364a9..4fc2327 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -36,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
 
 import com.datatorrent.lib.fileaccess.FileAccess;
 import com.datatorrent.netlet.util.Slice;
@@ -151,6 +152,16 @@ public interface Bucket extends ManagedStateComponent
       this.value = value;
     }
 
+    public long getSize()
+    {
+      long size = 0;
+      if (value != null) {
+        size += value.length;
+      }
+      size += Longs.BYTES; //time-bucket
+      return size;
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -205,7 +216,7 @@ public interface Bucket extends ManagedStateComponent
 
     private final transient Slice dummyGetKey = new Slice(null, 0, 0);
 
-    private transient TreeSet<BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+    private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
 
     private DefaultBucket()
     {
@@ -275,34 +286,33 @@ public interface Bucket extends ManagedStateComponent
           cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId);
         }
         if (timeBucket != -1) {
-          Slice valSlice = getValueFromTimeBucketReader(key, timeBucket);
-          if (valSlice != null) {
-            if (timeBucket == cachedBucketMetas.first().getTimeBucketId()) {
+          BucketedValue bucketedValue = getValueFromTimeBucketReader(key, timeBucket);
+          if (bucketedValue != null) {
+            if (timeBucket == cachedBucketMetas.firstKey()) {
               //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
-              BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice);
+              //Since the size of the whole time-bucket is added to total size, there is no need to add the size of
+              //entries in file cache.
               fileCache.put(key, bucketedValue);
             }
+            return bucketedValue.value;
           }
-          return valSlice;
         } else {
           //search all the time buckets
-          for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
+          for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas.values()) {
             if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
               //keys in the time bucket files are sorted so if the first key in the file is greater than the key being
               //searched, the key will not be present in that file.
-              Slice valSlice = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
-              if (valSlice != null) {
-                BucketedValue bucketedValue = new BucketedValue(immutableTimeBucketMeta.getTimeBucketId(), valSlice);
+              BucketedValue bucketedValue = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
+              if (bucketedValue != null) {
                 //Only when the key is read from the latest time bucket on the file, the key/value is put in the file
                 // cache.
                 fileCache.put(key, bucketedValue);
-                return valSlice;
+                return bucketedValue.value;
               }
             }
           }
-          return null;
         }
-
+        return null;
       } catch (IOException e) {
         throw new RuntimeException("get time-buckets " + bucketId, e);
       }
@@ -332,7 +342,7 @@ public interface Bucket extends ManagedStateComponent
      * @param timeBucket time bucket
      * @return value if key is found in the time bucket; false otherwise
      */
-    private Slice getValueFromTimeBucketReader(Slice key, long timeBucket)
+    private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
     {
       FileAccess.FileReader fileReader = readers.get(timeBucket);
       if (fileReader != null) {
@@ -349,13 +359,13 @@ public interface Bucket extends ManagedStateComponent
       }
     }
 
-    private Slice readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
+    private BucketedValue readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
     {
       Slice valSlice = new Slice(null, 0, 0);
       try {
         if (fileReader.seek(key)) {
           fileReader.next(dummyGetKey, valSlice);
-          return valSlice;
+          return new BucketedValue(timeBucket, valSlice);
         } else {
           return null;
         }
@@ -384,20 +394,18 @@ public interface Bucket extends ManagedStateComponent
     {
       BucketedValue bucketedValue = flash.get(key);
       if (bucketedValue == null) {
-        bucketedValue = new BucketedValue();
+        bucketedValue = new BucketedValue(timeBucket, value);
         flash.put(key, bucketedValue);
-        sizeInBytes.getAndAdd(key.length);
-        sizeInBytes.getAndAdd(Long.SIZE);
-      }
-      if (timeBucket >= bucketedValue.getTimeBucket()) {
-
-        int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
-        sizeInBytes.getAndAdd(inc);
-        bucketedValue.setTimeBucket(timeBucket);
-        bucketedValue.setValue(value);
+        sizeInBytes.getAndAdd(key.length + value.length + Longs.BYTES);
       } else {
-        LOG.warn("ignoring {} {} {}; existing {} {}", key, value, timeBucket,
-            bucketedValue.getValue(), bucketedValue.getTimeBucket());
+        if (timeBucket >= bucketedValue.getTimeBucket()) {
+          int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
+          sizeInBytes.getAndAdd(inc);
+          bucketedValue.setTimeBucket(timeBucket);
+          bucketedValue.setValue(value);
+        } else {
+          throw new AssertionError("newer entry exists for " + key);
+        }
       }
     }
 
@@ -411,14 +419,13 @@ public interface Bucket extends ManagedStateComponent
         Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
 
         for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) {
-          memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
+          memoryFreed += entry.getKey().length + entry.getValue().getSize();
         }
       }
-
       fileCache.clear();
       if (cachedBucketMetas != null) {
 
-        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas) {
+        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
           FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
           if (reader != null) {
             memoryFreed += tbm.getSizeInBytes();
@@ -461,12 +468,16 @@ public interface Bucket extends ManagedStateComponent
             fileCache.remove(key);
           }
 
+          long memoryFreed = 0;
+
           for (BucketedValue bucketedValue : bucketData.values()) {
             FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket());
             if (reader != null) {
               //closing the file reader for the time bucket if it is in memory because the time-bucket is modified
               //so it will be re-written by BucketsDataManager
               try {
+                BucketsFileSystem.TimeBucketMeta tbm = cachedBucketMetas.get(bucketedValue.getTimeBucket());
+                memoryFreed += tbm.getSizeInBytes();
                 LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
                 reader.close();
               } catch (IOException e) {
@@ -478,7 +489,7 @@ public interface Bucket extends ManagedStateComponent
               break;
             }
           }
-
+          sizeInBytes.getAndAdd(-memoryFreed);
           committedData.put(savedWindow, bucketData);
           stateIterator.remove();
         } else {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
index 313bfd5..f65c539 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -21,12 +21,10 @@ package org.apache.apex.malhar.lib.state.managed;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import javax.annotation.Nullable;
@@ -39,8 +37,8 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.common.collect.TreeBasedTable;
 
@@ -296,15 +294,14 @@ public class BucketsFileSystem implements ManagedStateComponent
    * @param bucketId bucket id
    * @return all the time buckets in order - latest to oldest
    */
-  public TreeSet<TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
+  public TreeMap<Long, TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
   {
     synchronized (timeBucketsMeta) {
-      TreeSet<TimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet(
-          Collections.<TimeBucketMeta>reverseOrder());
+      TreeMap<Long, TimeBucketMeta> immutableTimeBucketMetas = Maps.newTreeMap(Ordering.natural().<Long>reverse());
 
       if (timeBucketsMeta.containsRow(bucketId)) {
         for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
-          immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+          immutableTimeBucketMetas.put(entry.getKey(), entry.getValue().getImmutableTimeBucketMeta());
         }
         return immutableTimeBucketMetas;
       }
@@ -313,7 +310,7 @@ public class BucketsFileSystem implements ManagedStateComponent
           //Load meta info of all the time buckets of the bucket identified by bucket id
           loadBucketMetaFile(bucketId, dis);
           for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
-            immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+            immutableTimeBucketMetas.put(entry.getKey(), entry.getValue().getImmutableTimeBucketMeta());
           }
           return immutableTimeBucketMetas;
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
index beeeb4e..bd3319f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -71,7 +71,6 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
   {
     this();
     this.store = Preconditions.checkNotNull(store);
-    this.store.asyncReadSource = Bucket.ReadSource.READERS;
     this.isKeyContainsMultiValue = isKeyContainsMultiValue;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
index 1696d4d..ede2c85 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -22,7 +22,7 @@ package org.apache.apex.malhar.lib.state.managed;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.TreeSet;
+import java.util.TreeMap;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -74,7 +74,7 @@ public class BucketsFileSystemTest
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
     testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
 
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
     testMeta.bucketsFileSystem.teardown();
   }
 
@@ -89,7 +89,7 @@ public class BucketsFileSystemTest
     testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
 
     unsavedBucket0.putAll(more);
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
     testMeta.bucketsFileSystem.teardown();
   }
 
@@ -138,13 +138,13 @@ public class BucketsFileSystemTest
     testMeta.bucketsFileSystem.updateTimeBuckets(tbm2);
 
     testMeta.bucketsFileSystem.updateBucketMetaFile(1);
-    TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+    TreeMap<Long, BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
         testMeta.bucketsFileSystem.getAllTimeBuckets(1);
 
-    Iterator<BucketsFileSystem.TimeBucketMeta> iterator = timeBucketMetas.iterator();
+    Iterator<Map.Entry<Long, BucketsFileSystem.TimeBucketMeta>> iterator = timeBucketMetas.entrySet().iterator();
     int i = 2;
     while (iterator.hasNext()) {
-      BucketsFileSystem.TimeBucketMeta tbm = iterator.next();
+      BucketsFileSystem.TimeBucketMeta tbm = iterator.next().getValue();
       Assert.assertEquals("time bucket " + i, i, tbm.getTimeBucketId());
       i--;
     }
@@ -160,11 +160,11 @@ public class BucketsFileSystemTest
     BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1,1);
     Assert.assertNull("deleted tbm", immutableTbm);
 
-    TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+    TreeMap<Long, BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
         testMeta.bucketsFileSystem.getAllTimeBuckets(1);
 
     Assert.assertEquals("only 1 tbm", 1, timeBucketMetas.size());
-    immutableTbm = timeBucketMetas.iterator().next();
+    immutableTbm = timeBucketMetas.entrySet().iterator().next().getValue();
 
     Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
     testMeta.bucketsFileSystem.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 7539440..2058b69 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,8 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import com.google.common.primitives.Longs;
+
 import com.datatorrent.lib.fileaccess.FileAccess;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
 import com.datatorrent.lib.util.TestUtils;
@@ -79,7 +81,7 @@ public class DefaultBucketTest
     value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.READERS);
     Assert.assertNull("value not present", value);
 
-    Assert.assertEquals("size of bucket", one.length * 2 + 64, testMeta.defaultBucket.getSizeInBytes());
+    Assert.assertEquals("size of bucket", one.length * 2 + Longs.BYTES, testMeta.defaultBucket.getSizeInBytes());
     testMeta.defaultBucket.teardown();
   }
 
@@ -92,7 +94,7 @@ public class DefaultBucketTest
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
     testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
 
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
 
     Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.READERS);
     Assert.assertEquals("value one", one, value);
@@ -109,7 +111,7 @@ public class DefaultBucketTest
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
     testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
 
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
 
     Slice value = testMeta.defaultBucket.get(one, 101, Bucket.ReadSource.READERS);
     Assert.assertEquals("value one", one, value);
@@ -193,11 +195,11 @@ public class DefaultBucketTest
     Slice two = ManagedStateTestUtils.getSliceFor("2");
     testMeta.defaultBucket.put(two, 101, two);
 
-    Assert.assertEquals("size", initSize + (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+    Assert.assertEquals("size", initSize + (two.length * 2 + Longs.BYTES ), testMeta.defaultBucket.getSizeInBytes());
 
     long sizeFreed = testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
     Assert.assertEquals("size freed", initSize, sizeFreed);
-    Assert.assertEquals("existing size", (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+    Assert.assertEquals("existing size", (two.length * 2 + Longs.BYTES), testMeta.defaultBucket.getSizeInBytes());
     testMeta.defaultBucket.teardown();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
index ce8052a..a7e0827 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -127,7 +127,7 @@ public class IncrementalCheckpointManagerTest
     testMeta.checkpointManager.transferWindowFiles();
 
     for (int i = 0; i < 5; i++) {
-      ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i,
+      ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), i,
           buckets5.get((long)i), 1);
     }
   }
@@ -151,7 +151,8 @@ public class IncrementalCheckpointManagerTest
     Thread.sleep(500);
 
     for (int i = 0; i < 5; i++) {
-      ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, data.get((long)i), 1);
+      ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), i,
+          data.get((long)i), 1);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index c57e0ca..0d3f87a 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -44,9 +44,16 @@ import com.datatorrent.netlet.util.Slice;
 
 public class ManagedStateTestUtils
 {
-  public static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue>
-      unsavedBucket,
-      int keysPerTimeBucket) throws IOException
+  /**
+   * Validates the bucket data on the File System.
+   * @param fileAccess        file access
+   * @param bucketId          bucket id
+   * @param unsavedBucket     bucket data to compare with.
+   * @param keysPerTimeBucket num keys per time bucket
+   * @throws IOException
+   */
+  public static void validateBucketOnFileSystem(FileAccess fileAccess, long bucketId,
+      Map<Slice, Bucket.BucketedValue> unsavedBucket, int keysPerTimeBucket) throws IOException
   {
     RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
     TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
index bde437c..d713100 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -85,7 +85,7 @@ public class ManagedTimeStateImplTest
 
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
     testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
 
     Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 25596fa..82428fb 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -134,8 +134,8 @@ public class ManagedTimeUnifiedStateImplTest
 
     //write data to disk explicitly
     testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
-        unsavedBucket0, 1);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
+        testMeta.operatorContext.getId(), unsavedBucket0, 1);
 
     Slice value = testMeta.managedState.getSync(time, zero);
 
@@ -156,8 +156,8 @@ public class ManagedTimeUnifiedStateImplTest
 
     //write data to disk explicitly
     testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
-    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
-        unsavedBucket0, 1);
+    ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
+        testMeta.operatorContext.getId(), unsavedBucket0, 1);
 
     Future<Slice> valFuture = testMeta.managedState.getAsync(time, zero);