You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/07/08 05:41:06 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and setter for IncrementalCheckpointManager

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0a87bc0a5 -> 7b019fa1b


APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and setter for IncrementalCheckpointManager


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d25b9185
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d25b9185
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d25b9185

Branch: refs/heads/master
Commit: d25b9185b793a2f3745b774b700f0684a1bbcdda
Parents: c4a1129
Author: Chaitanya <ch...@datatorrent.com>
Authored: Thu Jul 7 22:38:38 2016 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Thu Jul 7 22:38:38 2016 +0530

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java | 42 +++++++++++++-------
 .../malhar/lib/state/managed/StateTracker.java  |  2 +-
 2 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 196ea69..b5b9f8c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -144,7 +144,7 @@ public abstract class AbstractManagedStateImpl
   protected transient ExecutorService readerService;
 
   @NotNull
-  protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
+  private IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
 
   @NotNull
   protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
@@ -203,22 +203,24 @@ public abstract class AbstractManagedStateImpl
       //delete all the wal files with windows > activationWindow.
       //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data.
       try {
-        for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) {
-          if (recoveredWindow <= activationWindow) {
-            @SuppressWarnings("unchecked")
-            Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
-                checkpointManager.load(operatorContext.getId(), recoveredWindow);
-            if (recoveredData != null && !recoveredData.isEmpty()) {
-              for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
-                int bucketIdx = prepareBucket(entry.getKey());
-                buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+        long[] recoveredWindows = checkpointManager.getWindowIds(operatorContext.getId());
+        if (recoveredWindows != null) {
+          for (long recoveredWindow : recoveredWindows) {
+            if (recoveredWindow <= activationWindow) {
+              @SuppressWarnings("unchecked")
+              Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+                  checkpointManager.load(operatorContext.getId(), recoveredWindow);
+              if (recoveredData != null && !recoveredData.isEmpty()) {
+                for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
+                  int bucketIdx = prepareBucket(entry.getKey());
+                  buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+                }
               }
+              checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
+                  true /*skipWritingToWindowFile*/);
+            } else {
+              checkpointManager.delete(operatorContext.getId(), recoveredWindow);
             }
-            checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
-                true /*skipWritingToWindowFile*/);
-
-          } else {
-            checkpointManager.delete(operatorContext.getId(), recoveredWindow);
           }
         }
       } catch (IOException e) {
@@ -536,6 +538,16 @@ public abstract class AbstractManagedStateImpl
     this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
   }
 
+  public IncrementalCheckpointManager getCheckpointManager()
+  {
+    return checkpointManager;
+  }
+
+  public void setCheckpointManager(@NotNull IncrementalCheckpointManager checkpointManager)
+  {
+    this.checkpointManager = Preconditions.checkNotNull(checkpointManager);
+  }
+
   static class ValueFetchTask implements Callable<Slice>
   {
     private final Bucket bucket;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 4813c25..5678107 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -122,7 +122,7 @@ class StateTracker extends TimerTask
             synchronized (bucket) {
               long sizeFreed;
               try {
-                sizeFreed = bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow());
+                sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
                 LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
               } catch (IOException e) {
                 managedStateImpl.throwable.set(e);


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2136-NPE-RecoverWindows'

Posted by cs...@apache.org.
Merge branch 'APEXMALHAR-2136-NPE-RecoverWindows'


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7b019fa1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7b019fa1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7b019fa1

Branch: refs/heads/master
Commit: 7b019fa1ba2cac60565c5ee0d9ebdcf396cd93b6
Parents: 0a87bc0 d25b918
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Jul 7 22:31:49 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Jul 7 22:31:49 2016 -0700

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java | 42 +++++++++++++-------
 .../malhar/lib/state/managed/StateTracker.java  |  2 +-
 2 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------