You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:25 UTC
[37/50] [abbrv] incubator-apex-malhar git commit: Removed checking
all the window ids in idempotency storage before replay
Removed checking all the window ids in idempotency storage before replay
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ada42ab9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ada42ab9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ada42ab9
Branch: refs/heads/master
Commit: ada42ab9ba0fe17162a38162b6d889afc91e741c
Parents: a57a3d7
Author: ishark <is...@datatorrent.com>
Authored: Fri Aug 14 14:37:24 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 14:37:24 2015 -0700
----------------------------------------------------------------------
.../redis/AbstractRedisInputOperator.java | 23 ++++++++++----------
1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ada42ab9/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 7f79bd0..260fbf6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
* @category Input
* @tags redis, key value
*
- * @param <T> The tuple type.
+ * @param <T>
+ * The tuple type.
* @since 0.9.3
*/
public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
@@ -47,6 +48,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
private transient Integer backupOffset;
private int scanCount;
private transient boolean replay;
+ private transient boolean skipOffsetRecovery = true;
@NotNull
private IdempotentStorageManager idempotentStorageManager;
@@ -92,33 +94,27 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
private void replay(long windowId)
{
try {
- if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+ // For first recovered window, offset is already part of recovery state.
+ // So skip reading from idempotency manager
+ if (!skipOffsetRecovery) {
// Begin offset for this window is recovery offset stored for the last
// window
RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
}
-
+ skipOffsetRecovery = false;
RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
if (recoveryState.scanOffsetAtBeginWindow != null) {
scanOffset = recoveryState.scanOffsetAtBeginWindow;
}
replay = true;
+
} catch (IOException e) {
DTThrowable.rethrow(e);
}
}
- private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
- {
- long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
- if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
- return false;
- }
- return true ;
- }
-
private void scanKeysFromOffset()
{
if (!scanComplete) {
@@ -157,11 +153,14 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
scanComplete = false;
scanParameters = new ScanParams();
scanParameters.count(scanCount);
+
// For the 1st window after checkpoint, windowID - 1 would not have recovery
// offset stored in idempotentStorageManager
// But recoveryOffset is non-transient, so will be recovered with
// checkPointing
+ // Offset recovery from idempotency storage can be skipped in this case
scanOffset = recoveryState.scanOffsetAtBeginWindow;
+ skipOffsetRecovery = true;
}
@Override