You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/22 11:39:23 UTC
apex-malhar git commit: APEXMALHAR-2380 Add MutablePair for Kinensis
Operator for Recovery State
Repository: apex-malhar
Updated Branches:
refs/heads/master bb968f210 -> cf60959a7
APEXMALHAR-2380 Add MutablePair for Kinensis Operator for Recovery State
Conflicts:
contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cf60959a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cf60959a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cf60959a
Branch: refs/heads/master
Commit: cf60959a7cc7041d5fa0e9512540f50cd6500146
Parents: bb968f2
Author: deepak-narkhede <ma...@gmail.com>
Authored: Tue Feb 21 14:31:23 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Wed Feb 22 16:36:54 2017 +0530
----------------------------------------------------------------------
.../kinesis/AbstractKinesisInputOperator.java | 39 +++++++-------------
1 file changed, 13 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cf60959a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 901aaa3..8df3277 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -38,12 +38,11 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Sets;
import com.datatorrent.api.Context.OperatorContext;
@@ -59,16 +58,6 @@ import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.Pair;
import com.datatorrent.lib.util.KryoCloneUtils;
-@DefaultSerializer(JavaSerializer.class)
-class KinesisPair <F, S> extends Pair<F, S>
-{
- public KinesisPair(F first, S second)
- {
- super(first, second);
- }
-}
-
-
/**
* Base implementation of Kinesis Input Operator. Fetches records from kinesis and emits them as tuples.<br/>
* <p>
@@ -103,7 +92,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
protected WindowDataManager windowDataManager;
protected transient long currentWindowId;
protected transient int operatorId;
- protected final transient Map<String, KinesisPair<String, Integer>> currentWindowRecoveryState;
+ protected final transient Map<String, MutablePair<String, Integer>> currentWindowRecoveryState;
@Valid
protected KinesisConsumer consumer = new KinesisConsumer();
@@ -150,7 +139,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
* would be NoopWindowDataManager.
*/
windowDataManager = new WindowDataManager.NoopWindowDataManager();
- currentWindowRecoveryState = new HashMap<String, KinesisPair<String, Integer>>();
+ currentWindowRecoveryState = new HashMap<String, MutablePair<String, Integer>>();
}
/**
* Derived class has to implement this method, so that it knows what type of message it is going to send to Malhar.
@@ -464,17 +453,17 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
{
try {
@SuppressWarnings("unchecked")
- Map<String, KinesisPair<String, Integer>> recoveredData =
- (Map<String, KinesisPair<String, Integer>>)windowDataManager.retrieve(windowId);
+ Map<String, MutablePair<String, Integer>> recoveredData =
+ (Map<String, MutablePair<String, Integer>>)windowDataManager.retrieve(windowId);
if (recoveredData == null) {
return;
}
- for (Map.Entry<String, KinesisPair<String, Integer>> rc: recoveredData.entrySet()) {
+ for (Map.Entry<String, MutablePair<String, Integer>> rc: recoveredData.entrySet()) {
logger.debug("Replaying the windowId: {}", windowId);
- logger.debug("ShardId: " + rc.getKey() + " , Start Sequence Id: " + rc.getValue().getFirst() + " , No Of Records: " + rc.getValue().getSecond());
+ logger.debug("ShardId: " + rc.getKey() + " , Start Sequence Id: " + rc.getValue().getLeft() + " , No Of Records: " + rc.getValue().getRight());
try {
- List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(),
- rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst());
+ List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getRight(),
+ rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getLeft());
for (Record record : records) {
emitTuple(new Pair<String, Record>(rc.getKey(), record));
shardPosition.put(rc.getKey(), record.getSequenceNumber());
@@ -581,13 +570,11 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
String shardId = data.getFirst();
String recordId = data.getSecond().getSequenceNumber();
emitTuple(data);
- if(!currentWindowRecoveryState.containsKey(shardId))
- {
- currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));
+ MutablePair<String, Integer> shardOffsetAndCount = currentWindowRecoveryState.get(shardId);
+ if (shardOffsetAndCount == null) {
+ currentWindowRecoveryState.put(shardId, new MutablePair<String, Integer>(recordId, 1));
} else {
- KinesisPair<String, Integer> second = currentWindowRecoveryState.get(shardId);
- Integer noOfRecords = second.getSecond();
- currentWindowRecoveryState.put(data.getFirst(), new KinesisPair<String, Integer>(second.getFirst(), noOfRecords+1));
+ shardOffsetAndCount.setRight(shardOffsetAndCount.right+1);
}
shardPosition.put(shardId, recordId);
}