You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/07/08 05:41:06 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2136 1) Fixed the null
pointer exception issue. 2) Added getter and setter for
IncrementalCheckpointManager
Repository: apex-malhar
Updated Branches:
refs/heads/master 0a87bc0a5 -> 7b019fa1b
APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and setter for IncrementalCheckpointManager
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d25b9185
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d25b9185
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d25b9185
Branch: refs/heads/master
Commit: d25b9185b793a2f3745b774b700f0684a1bbcdda
Parents: c4a1129
Author: Chaitanya <ch...@datatorrent.com>
Authored: Thu Jul 7 22:38:38 2016 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Thu Jul 7 22:38:38 2016 +0530
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 42 +++++++++++++-------
.../malhar/lib/state/managed/StateTracker.java | 2 +-
2 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 196ea69..b5b9f8c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -144,7 +144,7 @@ public abstract class AbstractManagedStateImpl
protected transient ExecutorService readerService;
@NotNull
- protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
+ private IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
@NotNull
protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
@@ -203,22 +203,24 @@ public abstract class AbstractManagedStateImpl
//delete all the wal files with windows > activationWindow.
//All the wal files with windows <= activationWindow are loaded and kept separately as recovered data.
try {
- for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) {
- if (recoveredWindow <= activationWindow) {
- @SuppressWarnings("unchecked")
- Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
- checkpointManager.load(operatorContext.getId(), recoveredWindow);
- if (recoveredData != null && !recoveredData.isEmpty()) {
- for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
- int bucketIdx = prepareBucket(entry.getKey());
- buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+ long[] recoveredWindows = checkpointManager.getWindowIds(operatorContext.getId());
+ if (recoveredWindows != null) {
+ for (long recoveredWindow : recoveredWindows) {
+ if (recoveredWindow <= activationWindow) {
+ @SuppressWarnings("unchecked")
+ Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+ checkpointManager.load(operatorContext.getId(), recoveredWindow);
+ if (recoveredData != null && !recoveredData.isEmpty()) {
+ for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
+ int bucketIdx = prepareBucket(entry.getKey());
+ buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+ }
}
+ checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
+ true /*skipWritingToWindowFile*/);
+ } else {
+ checkpointManager.delete(operatorContext.getId(), recoveredWindow);
}
- checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
- true /*skipWritingToWindowFile*/);
-
- } else {
- checkpointManager.delete(operatorContext.getId(), recoveredWindow);
}
}
} catch (IOException e) {
@@ -536,6 +538,16 @@ public abstract class AbstractManagedStateImpl
this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
}
+ public IncrementalCheckpointManager getCheckpointManager()
+ {
+ return checkpointManager;
+ }
+
+ public void setCheckpointManager(@NotNull IncrementalCheckpointManager checkpointManager)
+ {
+ this.checkpointManager = Preconditions.checkNotNull(checkpointManager);
+ }
+
static class ValueFetchTask implements Callable<Slice>
{
private final Bucket bucket;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 4813c25..5678107 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -122,7 +122,7 @@ class StateTracker extends TimerTask
synchronized (bucket) {
long sizeFreed;
try {
- sizeFreed = bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow());
+ sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
} catch (IOException e) {
managedStateImpl.throwable.set(e);
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2136-NPE-RecoverWindows'
Posted by cs...@apache.org.
Merge branch 'APEXMALHAR-2136-NPE-RecoverWindows'
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7b019fa1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7b019fa1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7b019fa1
Branch: refs/heads/master
Commit: 7b019fa1ba2cac60565c5ee0d9ebdcf396cd93b6
Parents: 0a87bc0 d25b918
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Jul 7 22:31:49 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Jul 7 22:31:49 2016 -0700
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 42 +++++++++++++-------
.../malhar/lib/state/managed/StateTracker.java | 2 +-
2 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------