You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/05/17 20:49:05 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-2073 fixed the
race condition that caused the first key in tbm to be null
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master 25857e391 -> 4b126aa52
APEXMALHAR-2073 fixed the race condition that caused the first key in tbm to be null
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ca9fed97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ca9fed97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ca9fed97
Branch: refs/heads/master
Commit: ca9fed971f7ff26643ce2417c750b951fc161752
Parents: f94c2d8
Author: Chandni Singh <cs...@apache.org>
Authored: Mon May 16 17:30:51 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon May 16 21:52:26 2016 -0700
----------------------------------------------------------------------
.../apex/malhar/lib/state/managed/Bucket.java | 12 ++-------
.../lib/state/managed/BucketsFileSystem.java | 26 ++++++++++++++------
.../managed/IncrementalCheckpointManager.java | 8 +++++-
.../malhar/lib/state/managed/StateTracker.java | 2 +-
.../state/managed/BucketsFileSystemTest.java | 13 +++++++---
.../lib/state/managed/ManagedStateImplTest.java | 1 -
6 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 107bfc6..352f121 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -285,7 +285,6 @@ public interface Bucket extends ManagedStateComponent
} else {
//search all the time buckets
for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
-
if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
//keys in the time bucket files are sorted so if the first key in the file is greater than the key being
//searched, the key will not be present in that file.
@@ -400,16 +399,8 @@ public interface Bucket extends ManagedStateComponent
@Override
public long freeMemory(long windowId) throws IOException
{
- LOG.debug("free space {}", bucketId);
long memoryFreed = 0;
-
- for (Map.Entry<Long, Map<Slice, BucketedValue>> windowEntry : committedData.entrySet()) {
- for (Map.Entry<Slice, BucketedValue> entry: windowEntry.getValue().entrySet()) {
- memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
- }
- }
-
- Long clearWindowId = null;
+ Long clearWindowId;
while ((clearWindowId = committedData.floorKey(windowId)) != null) {
Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
@@ -432,6 +423,7 @@ public interface Bucket extends ManagedStateComponent
}
sizeInBytes.getAndAdd(-memoryFreed);
+ LOG.debug("space freed {} {}", bucketId, memoryFreed);
return memoryFreed;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
index 8304fb6..483d9e8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -150,7 +150,11 @@ public class BucketsFileSystem implements ManagedStateComponent
}
for (long timeBucket : timeBucketedKeys.rowKeySet()) {
- BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId, timeBucket);
+ BucketsFileSystem.MutableTimeBucketMeta tbm = getMutableTimeBucketMeta(bucketId, timeBucket);
+ if (tbm == null) {
+ tbm = new MutableTimeBucketMeta(bucketId, timeBucket);
+ }
+
addBucketName(bucketId);
long dataSize = 0;
@@ -202,6 +206,7 @@ public class BucketsFileSystem implements ManagedStateComponent
fileWriter.close();
rename(bucketId, tmpFileName, getFileName(timeBucket));
tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
+ updateTimeBuckets(tbm);
}
updateBucketMetaFile(bucketId);
@@ -217,15 +222,19 @@ public class BucketsFileSystem implements ManagedStateComponent
* @throws IOException
*/
@NotNull
- MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+ private MutableTimeBucketMeta getMutableTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
{
synchronized (timeBucketsMeta) {
- MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
- if (tbm == null) {
- tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
- timeBucketsMeta.put(bucketId, timeBucketId, tbm);
- }
- return tbm;
+ return timeBucketMetaHelper(bucketId, timeBucketId);
+ }
+ }
+
+ void updateTimeBuckets(@NotNull MutableTimeBucketMeta mutableTimeBucketMeta)
+ {
+ Preconditions.checkNotNull(mutableTimeBucketMeta, "mutable time bucket meta");
+ synchronized (timeBucketsMeta) {
+ timeBucketsMeta.put(mutableTimeBucketMeta.getBucketId(), mutableTimeBucketMeta.getTimeBucketId(),
+ mutableTimeBucketMeta);
}
}
@@ -320,6 +329,7 @@ public class BucketsFileSystem implements ManagedStateComponent
*/
private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException
{
+ LOG.debug("Loading bucket meta-file {}", bucketId);
int metaDataVersion = dis.readInt();
if (metaDataVersion == META_FILE_VERSION) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 02fd6ec..536702d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -71,6 +71,8 @@ public class IncrementalCheckpointManager extends WindowDataManager.FSWindowData
private transient int waitMillis;
private volatile long lastTransferredWindow = Stateless.WINDOW_ID;
+ private transient long largestWindowAddedToTransferQueue = Stateless.WINDOW_ID;
+
public IncrementalCheckpointManager()
{
super();
@@ -188,8 +190,12 @@ public class IncrementalCheckpointManager extends WindowDataManager.FSWindowData
{
LOG.debug("data manager committed {}", windowId);
for (Long currentWindow : savedWindows.keySet()) {
+ if (currentWindow <= largestWindowAddedToTransferQueue) {
+ continue;
+ }
if (currentWindow <= windowId) {
- LOG.debug("to transfer {}", windowId);
+ LOG.debug("to transfer {}", currentWindow);
+ largestWindowAddedToTransferQueue = currentWindow;
windowsToTransfer.add(currentWindow);
} else {
break;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 3bb5507..4813c25 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -123,7 +123,7 @@ class StateTracker extends TimerTask
long sizeFreed;
try {
sizeFreed = bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow());
- LOG.debug("size freed {} {}", bucketId, sizeFreed);
+ LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
} catch (IOException e) {
managedStateImpl.throwable.set(e);
throw new RuntimeException("freeing " + bucketId, e);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
index ee6cf00..1696d4d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -97,10 +97,12 @@ public class BucketsFileSystemTest
public void testUpdateBucketMetaDataFile() throws IOException
{
testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
- BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+ BucketsFileSystem.MutableTimeBucketMeta mutableTbm = new BucketsFileSystem.MutableTimeBucketMeta(1, 1);
mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+ testMeta.bucketsFileSystem.updateTimeBuckets(mutableTbm);
testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+
BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
Assert.assertNotNull(immutableTbm);
Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId());
@@ -116,7 +118,8 @@ public class BucketsFileSystemTest
BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
Assert.assertNull("bucket meta", bucketMeta);
- testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+ BucketsFileSystem.MutableTimeBucketMeta mutableTimeBucketMeta = new BucketsFileSystem.MutableTimeBucketMeta(1, 1);
+ testMeta.bucketsFileSystem.updateTimeBuckets(mutableTimeBucketMeta);
bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
Assert.assertNotNull("bucket meta not null", bucketMeta);
testMeta.bucketsFileSystem.teardown();
@@ -126,11 +129,13 @@ public class BucketsFileSystemTest
public void testGetAllTimeBucketMeta() throws IOException
{
testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
- BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+ BucketsFileSystem.MutableTimeBucketMeta tbm1 = new BucketsFileSystem.MutableTimeBucketMeta(1, 1);
tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+ testMeta.bucketsFileSystem.updateTimeBuckets(tbm1);
- BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2);
+ BucketsFileSystem.MutableTimeBucketMeta tbm2 = new BucketsFileSystem.MutableTimeBucketMeta(1, 2);
tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes()));
+ testMeta.bucketsFileSystem.updateTimeBuckets(tbm2);
testMeta.bucketsFileSystem.updateBucketMetaFile(1);
TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca9fed97/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
index cdd9781..99e6c23 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -174,7 +174,6 @@ public class ManagedStateImplTest
{
testMeta.managedState.setMaxMemorySize(1);
testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L));
- testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L));
testMeta.managedState.setup(testMeta.operatorContext);
int numKeys = 300;
[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2073'
Posted by ti...@apache.org.
Merge branch 'APEXMALHAR-2073'
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4b126aa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4b126aa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4b126aa5
Branch: refs/heads/master
Commit: 4b126aa527351825a0b47a8378611d3b8ef6a12d
Parents: 25857e3 ca9fed9
Author: ilooner <ti...@gmail.com>
Authored: Tue May 17 13:48:31 2016 -0700
Committer: ilooner <ti...@gmail.com>
Committed: Tue May 17 13:48:31 2016 -0700
----------------------------------------------------------------------
.../apex/malhar/lib/state/managed/Bucket.java | 12 ++-------
.../lib/state/managed/BucketsFileSystem.java | 26 ++++++++++++++------
.../managed/IncrementalCheckpointManager.java | 8 +++++-
.../malhar/lib/state/managed/StateTracker.java | 2 +-
.../state/managed/BucketsFileSystemTest.java | 13 +++++++---
.../lib/state/managed/ManagedStateImplTest.java | 1 -
6 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------