You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/20 09:36:14 UTC
apex-malhar git commit: APEXMALHAR-2411 Avoid isreplaystate variable,
incorporate logic in activate() and replay() for Kinesis Input
Operator
Repository: apex-malhar
Updated Branches:
refs/heads/master 22c65c4c0 -> 6c42103f8
APEXMALHAR-2411 Avoid isreplaystate variable, incorporate logic in activate() and replay() for Kinesis Input Operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6c42103f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6c42103f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6c42103f
Branch: refs/heads/master
Commit: 6c42103f8a9721649d2c6ce3905cffc8baf61fb9
Parents: 22c65c4
Author: deepak-narkhede <ma...@gmail.com>
Authored: Fri Feb 17 11:43:05 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Mon Feb 20 14:24:32 2017 +0530
----------------------------------------------------------------------
.../kinesis/AbstractKinesisInputOperator.java | 33 ++++++++++----------
1 file changed, 16 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6c42103f/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index fc10bea..18a6399 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -56,6 +56,7 @@ import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.Pair;
import com.datatorrent.lib.util.KryoCloneUtils;
@@ -128,8 +129,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
private transient long lastRepartitionTime = 0L;
- private transient boolean isReplayState = false;
-
//No of shards per partition in dynamic MANY_TO_ONE strategy
// If the value is more than 1, then it enables the dynamic partitioning
@Min(1)
@@ -425,9 +424,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
operatorId = context.getId();
windowDataManager.setup(context);
shardPosition.clear();
- if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
- isReplayState = true;
- }
}
/**
@@ -477,6 +473,18 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
throw new RuntimeException(e);
}
}
+
+ /*
+ * Set the shard positions and start the consumer if last recovery windowid
+ * match with current completed windowid.
+ */
+ if (windowId == windowDataManager.getLargestCompletedWindow()) {
+ // Set the shard positions to the consumer
+ Map<String, String> statsData = new HashMap<String, String>(getConsumer().getShardPosition());
+ statsData.putAll(shardPosition);
+ getConsumer().resetShardPositions(statsData);
+ consumer.start();
+ }
}
catch (IOException e) {
throw new RuntimeException("replay", e);
@@ -507,9 +515,9 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
@Override
public void activate(OperatorContext ctx)
{
- if(isReplayState)
- {
- // If it is a replay state, don't start the consumer
+ // If it is a replay state, don't start the consumer
+ if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID &&
+ context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
return;
}
consumer.start();
@@ -573,15 +581,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
}
shardPosition.put(shardId, recordId);
}
- if(isReplayState)
- {
- isReplayState = false;
- // Set the shard positions to the consumer
- Map<String, String> statsData = new HashMap<String, String>(getConsumer().getShardPosition());
- statsData.putAll(shardPosition);
- getConsumer().resetShardPositions(statsData);
- consumer.start();
- }
emitCount += count;
}