You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:25 UTC

[37/50] [abbrv] incubator-apex-malhar git commit: Removed checking all the window ids in idempotency storage before replay

Removed checking all the window ids in idempotency storage before replay


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ada42ab9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ada42ab9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ada42ab9

Branch: refs/heads/master
Commit: ada42ab9ba0fe17162a38162b6d889afc91e741c
Parents: a57a3d7
Author: ishark <is...@datatorrent.com>
Authored: Fri Aug 14 14:37:24 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 14:37:24 2015 -0700

----------------------------------------------------------------------
 .../redis/AbstractRedisInputOperator.java       | 23 ++++++++++----------
 1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ada42ab9/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 7f79bd0..260fbf6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
  * @category Input
  * @tags redis, key value
  *
- * @param <T> The tuple type.
+ * @param <T>
+ *          The tuple type.
  * @since 0.9.3
  */
 public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
@@ -47,6 +48,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
   private transient Integer backupOffset;
   private int scanCount;
   private transient boolean replay;
+  private transient boolean skipOffsetRecovery = true;
 
   @NotNull
   private IdempotentStorageManager idempotentStorageManager;
@@ -92,33 +94,27 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
   private void replay(long windowId)
   {
     try {
-      if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+      // For first recovered window, offset is already part of recovery state.
+      // So skip reading from idempotency manager
+      if (!skipOffsetRecovery) {
         // Begin offset for this window is recovery offset stored for the last
         // window
         RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
         recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
       }
-
+      skipOffsetRecovery = false;
       RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
       recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
       if (recoveryState.scanOffsetAtBeginWindow != null) {
         scanOffset = recoveryState.scanOffsetAtBeginWindow;
       }
       replay = true;
+
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
   }
 
-  private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
-  {
-    long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
-    if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
-      return false;
-    }
-    return true ;
-  }
-
   private void scanKeysFromOffset()
   {
     if (!scanComplete) {
@@ -157,11 +153,14 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
     scanComplete = false;
     scanParameters = new ScanParams();
     scanParameters.count(scanCount);
+    
     // For the 1st window after checkpoint, windowID - 1 would not have recovery
     // offset stored in idempotentStorageManager
     // But recoveryOffset is non-transient, so will be recovered with
     // checkPointing
+    // Offset recovery from idempotency storage can be skipped in this case
     scanOffset = recoveryState.scanOffsetAtBeginWindow;
+    skipOffsetRecovery = true;
   }
 
   @Override