You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/11 23:57:59 UTC
apex-malhar git commit: APEXMALHAR-2281 fixed race condition with put
and asyncGet
Repository: apex-malhar
Updated Branches:
refs/heads/master aeb10f33d -> 07812a903
APEXMALHAR-2281 fixed race condition with put and asyncGet
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/07812a90
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/07812a90
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/07812a90
Branch: refs/heads/master
Commit: 07812a90394c92ae7c4b0587edf910f7ed330793
Parents: aeb10f3
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Oct 6 21:35:34 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Oct 11 11:30:26 2016 -0700
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 9 ++-
.../apex/malhar/lib/state/managed/Bucket.java | 77 +++++++++++---------
.../lib/state/managed/BucketsFileSystem.java | 13 ++--
.../managed/ManagedTimeStateMultiValue.java | 1 -
.../state/managed/BucketsFileSystemTest.java | 16 ++--
.../lib/state/managed/DefaultBucketTest.java | 12 +--
.../IncrementalCheckpointManagerTest.java | 5 +-
.../state/managed/ManagedStateTestUtils.java | 13 +++-
.../state/managed/ManagedTimeStateImplTest.java | 2 +-
.../ManagedTimeUnifiedStateImplTest.java | 8 +-
10 files changed, 88 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 1e378ec..dd2bbab 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -172,8 +172,6 @@ public abstract class AbstractManagedStateImpl
protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
- protected Bucket.ReadSource asyncReadSource = Bucket.ReadSource.ALL;
-
@Override
public void setup(OperatorContext context)
{
@@ -274,6 +272,11 @@ public abstract class AbstractManagedStateImpl
if (timeBucket != -1) {
//time bucket is invalid data is not stored
int bucketIdx = prepareBucket(bucketId);
+ //synchronization on a bucket isn't required for put because the event is added to flash which is
+ //a concurrent map. The assumption here is that the calls to put & get(sync/async) are being made synchronously by
+ //a single thread (operator thread). The get(sync/async) always checks memory first synchronously.
+ //If the key is not in the memory, then the async get will uses other reader threads which will fetch it from
+ //the files.
buckets[bucketIdx].put(key, timeBucket, value);
}
}
@@ -566,7 +569,7 @@ public abstract class AbstractManagedStateImpl
synchronized (bucket) {
//a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
//involves creating readers for the time buckets and de-serializing key/value from a reader.
- Slice value = bucket.get(key, timeBucketId, this.managedState.asyncReadSource);
+ Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
return value;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 74364a9..4fc2327 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.netlet.util.Slice;
@@ -151,6 +152,16 @@ public interface Bucket extends ManagedStateComponent
this.value = value;
}
+ public long getSize()
+ {
+ long size = 0;
+ if (value != null) {
+ size += value.length;
+ }
+ size += Longs.BYTES; //time-bucket
+ return size;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -205,7 +216,7 @@ public interface Bucket extends ManagedStateComponent
private final transient Slice dummyGetKey = new Slice(null, 0, 0);
- private transient TreeSet<BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+ private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
private DefaultBucket()
{
@@ -275,34 +286,33 @@ public interface Bucket extends ManagedStateComponent
cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId);
}
if (timeBucket != -1) {
- Slice valSlice = getValueFromTimeBucketReader(key, timeBucket);
- if (valSlice != null) {
- if (timeBucket == cachedBucketMetas.first().getTimeBucketId()) {
+ BucketedValue bucketedValue = getValueFromTimeBucketReader(key, timeBucket);
+ if (bucketedValue != null) {
+ if (timeBucket == cachedBucketMetas.firstKey()) {
//if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
- BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice);
+ //Since the size of the whole time-bucket is added to total size, there is no need to add the size of
+ //entries in file cache.
fileCache.put(key, bucketedValue);
}
+ return bucketedValue.value;
}
- return valSlice;
} else {
//search all the time buckets
- for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
+ for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas.values()) {
if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
//keys in the time bucket files are sorted so if the first key in the file is greater than the key being
//searched, the key will not be present in that file.
- Slice valSlice = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
- if (valSlice != null) {
- BucketedValue bucketedValue = new BucketedValue(immutableTimeBucketMeta.getTimeBucketId(), valSlice);
+ BucketedValue bucketedValue = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
+ if (bucketedValue != null) {
//Only when the key is read from the latest time bucket on the file, the key/value is put in the file
// cache.
fileCache.put(key, bucketedValue);
- return valSlice;
+ return bucketedValue.value;
}
}
}
- return null;
}
-
+ return null;
} catch (IOException e) {
throw new RuntimeException("get time-buckets " + bucketId, e);
}
@@ -332,7 +342,7 @@ public interface Bucket extends ManagedStateComponent
* @param timeBucket time bucket
* @return value if key is found in the time bucket; false otherwise
*/
- private Slice getValueFromTimeBucketReader(Slice key, long timeBucket)
+ private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
{
FileAccess.FileReader fileReader = readers.get(timeBucket);
if (fileReader != null) {
@@ -349,13 +359,13 @@ public interface Bucket extends ManagedStateComponent
}
}
- private Slice readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
+ private BucketedValue readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
{
Slice valSlice = new Slice(null, 0, 0);
try {
if (fileReader.seek(key)) {
fileReader.next(dummyGetKey, valSlice);
- return valSlice;
+ return new BucketedValue(timeBucket, valSlice);
} else {
return null;
}
@@ -384,20 +394,18 @@ public interface Bucket extends ManagedStateComponent
{
BucketedValue bucketedValue = flash.get(key);
if (bucketedValue == null) {
- bucketedValue = new BucketedValue();
+ bucketedValue = new BucketedValue(timeBucket, value);
flash.put(key, bucketedValue);
- sizeInBytes.getAndAdd(key.length);
- sizeInBytes.getAndAdd(Long.SIZE);
- }
- if (timeBucket >= bucketedValue.getTimeBucket()) {
-
- int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
- sizeInBytes.getAndAdd(inc);
- bucketedValue.setTimeBucket(timeBucket);
- bucketedValue.setValue(value);
+ sizeInBytes.getAndAdd(key.length + value.length + Longs.BYTES);
} else {
- LOG.warn("ignoring {} {} {}; existing {} {}", key, value, timeBucket,
- bucketedValue.getValue(), bucketedValue.getTimeBucket());
+ if (timeBucket >= bucketedValue.getTimeBucket()) {
+ int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
+ sizeInBytes.getAndAdd(inc);
+ bucketedValue.setTimeBucket(timeBucket);
+ bucketedValue.setValue(value);
+ } else {
+ throw new AssertionError("newer entry exists for " + key);
+ }
}
}
@@ -411,14 +419,13 @@ public interface Bucket extends ManagedStateComponent
Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) {
- memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
+ memoryFreed += entry.getKey().length + entry.getValue().getSize();
}
}
-
fileCache.clear();
if (cachedBucketMetas != null) {
- for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas) {
+ for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
if (reader != null) {
memoryFreed += tbm.getSizeInBytes();
@@ -461,12 +468,16 @@ public interface Bucket extends ManagedStateComponent
fileCache.remove(key);
}
+ long memoryFreed = 0;
+
for (BucketedValue bucketedValue : bucketData.values()) {
FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket());
if (reader != null) {
//closing the file reader for the time bucket if it is in memory because the time-bucket is modified
//so it will be re-written by BucketsDataManager
try {
+ BucketsFileSystem.TimeBucketMeta tbm = cachedBucketMetas.get(bucketedValue.getTimeBucket());
+ memoryFreed += tbm.getSizeInBytes();
LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
reader.close();
} catch (IOException e) {
@@ -478,7 +489,7 @@ public interface Bucket extends ManagedStateComponent
break;
}
}
-
+ sizeInBytes.getAndAdd(-memoryFreed);
committedData.put(savedWindow, bucketData);
stateIterator.remove();
} else {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
index 313bfd5..f65c539 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -21,12 +21,10 @@ package org.apache.apex.malhar.lib.state.managed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import javax.annotation.Nullable;
@@ -39,8 +37,8 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.collect.TreeBasedTable;
@@ -296,15 +294,14 @@ public class BucketsFileSystem implements ManagedStateComponent
* @param bucketId bucket id
* @return all the time buckets in order - latest to oldest
*/
- public TreeSet<TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
+ public TreeMap<Long, TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
{
synchronized (timeBucketsMeta) {
- TreeSet<TimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet(
- Collections.<TimeBucketMeta>reverseOrder());
+ TreeMap<Long, TimeBucketMeta> immutableTimeBucketMetas = Maps.newTreeMap(Ordering.natural().<Long>reverse());
if (timeBucketsMeta.containsRow(bucketId)) {
for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
- immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+ immutableTimeBucketMetas.put(entry.getKey(), entry.getValue().getImmutableTimeBucketMeta());
}
return immutableTimeBucketMetas;
}
@@ -313,7 +310,7 @@ public class BucketsFileSystem implements ManagedStateComponent
//Load meta info of all the time buckets of the bucket identified by bucket id
loadBucketMetaFile(bucketId, dis);
for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
- immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+ immutableTimeBucketMetas.put(entry.getKey(), entry.getValue().getImmutableTimeBucketMeta());
}
return immutableTimeBucketMetas;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
index beeeb4e..bd3319f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -71,7 +71,6 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
{
this();
this.store = Preconditions.checkNotNull(store);
- this.store.asyncReadSource = Bucket.ReadSource.READERS;
this.isKeyContainsMultiValue = isKeyContainsMultiValue;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
index 1696d4d..ede2c85 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -22,7 +22,7 @@ package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
-import java.util.TreeSet;
+import java.util.TreeMap;
import org.junit.Assert;
import org.junit.Rule;
@@ -74,7 +74,7 @@ public class BucketsFileSystemTest
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
testMeta.bucketsFileSystem.teardown();
}
@@ -89,7 +89,7 @@ public class BucketsFileSystemTest
testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
unsavedBucket0.putAll(more);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
testMeta.bucketsFileSystem.teardown();
}
@@ -138,13 +138,13 @@ public class BucketsFileSystemTest
testMeta.bucketsFileSystem.updateTimeBuckets(tbm2);
testMeta.bucketsFileSystem.updateBucketMetaFile(1);
- TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+ TreeMap<Long, BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
testMeta.bucketsFileSystem.getAllTimeBuckets(1);
- Iterator<BucketsFileSystem.TimeBucketMeta> iterator = timeBucketMetas.iterator();
+ Iterator<Map.Entry<Long, BucketsFileSystem.TimeBucketMeta>> iterator = timeBucketMetas.entrySet().iterator();
int i = 2;
while (iterator.hasNext()) {
- BucketsFileSystem.TimeBucketMeta tbm = iterator.next();
+ BucketsFileSystem.TimeBucketMeta tbm = iterator.next().getValue();
Assert.assertEquals("time bucket " + i, i, tbm.getTimeBucketId());
i--;
}
@@ -160,11 +160,11 @@ public class BucketsFileSystemTest
BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1,1);
Assert.assertNull("deleted tbm", immutableTbm);
- TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+ TreeMap<Long, BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
testMeta.bucketsFileSystem.getAllTimeBuckets(1);
Assert.assertEquals("only 1 tbm", 1, timeBucketMetas.size());
- immutableTbm = timeBucketMetas.iterator().next();
+ immutableTbm = timeBucketMetas.entrySet().iterator().next().getValue();
Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
testMeta.bucketsFileSystem.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 7539440..2058b69 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,8 @@ import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import com.google.common.primitives.Longs;
+
import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
import com.datatorrent.lib.util.TestUtils;
@@ -79,7 +81,7 @@ public class DefaultBucketTest
value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.READERS);
Assert.assertNull("value not present", value);
- Assert.assertEquals("size of bucket", one.length * 2 + 64, testMeta.defaultBucket.getSizeInBytes());
+ Assert.assertEquals("size of bucket", one.length * 2 + Longs.BYTES, testMeta.defaultBucket.getSizeInBytes());
testMeta.defaultBucket.teardown();
}
@@ -92,7 +94,7 @@ public class DefaultBucketTest
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.READERS);
Assert.assertEquals("value one", one, value);
@@ -109,7 +111,7 @@ public class DefaultBucketTest
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
Slice value = testMeta.defaultBucket.get(one, 101, Bucket.ReadSource.READERS);
Assert.assertEquals("value one", one, value);
@@ -193,11 +195,11 @@ public class DefaultBucketTest
Slice two = ManagedStateTestUtils.getSliceFor("2");
testMeta.defaultBucket.put(two, 101, two);
- Assert.assertEquals("size", initSize + (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+ Assert.assertEquals("size", initSize + (two.length * 2 + Longs.BYTES ), testMeta.defaultBucket.getSizeInBytes());
long sizeFreed = testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
Assert.assertEquals("size freed", initSize, sizeFreed);
- Assert.assertEquals("existing size", (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+ Assert.assertEquals("existing size", (two.length * 2 + Longs.BYTES), testMeta.defaultBucket.getSizeInBytes());
testMeta.defaultBucket.teardown();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
index ce8052a..a7e0827 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -127,7 +127,7 @@ public class IncrementalCheckpointManagerTest
testMeta.checkpointManager.transferWindowFiles();
for (int i = 0; i < 5; i++) {
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i,
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), i,
buckets5.get((long)i), 1);
}
}
@@ -151,7 +151,8 @@ public class IncrementalCheckpointManagerTest
Thread.sleep(500);
for (int i = 0; i < 5; i++) {
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, data.get((long)i), 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), i,
+ data.get((long)i), 1);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index c57e0ca..0d3f87a 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -44,9 +44,16 @@ import com.datatorrent.netlet.util.Slice;
public class ManagedStateTestUtils
{
- public static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue>
- unsavedBucket,
- int keysPerTimeBucket) throws IOException
+ /**
+ * Validates the bucket data on the File System.
+ * @param fileAccess file access
+ * @param bucketId bucket id
+ * @param unsavedBucket bucket data to compare with.
+ * @param keysPerTimeBucket num keys per time bucket
+ * @throws IOException
+ */
+ public static void validateBucketOnFileSystem(FileAccess fileAccess, long bucketId,
+ Map<Slice, Bucket.BucketedValue> unsavedBucket, int keysPerTimeBucket) throws IOException
{
RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
index bde437c..d713100 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -85,7 +85,7 @@ public class ManagedTimeStateImplTest
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/07812a90/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 25596fa..82428fb 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -134,8 +134,8 @@ public class ManagedTimeUnifiedStateImplTest
//write data to disk explicitly
testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
- unsavedBucket0, 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
+ testMeta.operatorContext.getId(), unsavedBucket0, 1);
Slice value = testMeta.managedState.getSync(time, zero);
@@ -156,8 +156,8 @@ public class ManagedTimeUnifiedStateImplTest
//write data to disk explicitly
testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
- ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
- unsavedBucket0, 1);
+ ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(),
+ testMeta.operatorContext.getId(), unsavedBucket0, 1);
Future<Slice> valFuture = testMeta.managedState.getAsync(time, zero);