You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/22 11:39:23 UTC

apex-malhar git commit: APEXMALHAR-2380 Add MutablePair for Kinensis Operator for Recovery State

Repository: apex-malhar
Updated Branches:
  refs/heads/master bb968f210 -> cf60959a7


APEXMALHAR-2380 Add MutablePair for Kinensis Operator for Recovery State

Conflicts:
	contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cf60959a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cf60959a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cf60959a

Branch: refs/heads/master
Commit: cf60959a7cc7041d5fa0e9512540f50cd6500146
Parents: bb968f2
Author: deepak-narkhede <ma...@gmail.com>
Authored: Tue Feb 21 14:31:23 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Wed Feb 22 16:36:54 2017 +0530

----------------------------------------------------------------------
 .../kinesis/AbstractKinesisInputOperator.java   | 39 +++++++-------------
 1 file changed, 13 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cf60959a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 901aaa3..8df3277 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -38,12 +38,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
 
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Context.OperatorContext;
@@ -59,16 +58,6 @@ import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
-@DefaultSerializer(JavaSerializer.class)
-class KinesisPair <F, S> extends Pair<F, S>
-{
-  public KinesisPair(F first, S second)
-  {
-    super(first, second);
-  }
-}
-
-
 /**
  * Base implementation of Kinesis Input Operator. Fetches records from kinesis and emits them as tuples.<br/>
  * <p>
@@ -103,7 +92,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
   protected WindowDataManager windowDataManager;
   protected transient long currentWindowId;
   protected transient int operatorId;
-  protected final transient Map<String, KinesisPair<String, Integer>> currentWindowRecoveryState;
+  protected final transient Map<String, MutablePair<String, Integer>> currentWindowRecoveryState;
   @Valid
   protected KinesisConsumer consumer = new KinesisConsumer();
 
@@ -150,7 +139,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
      * would be NoopWindowDataManager.
      */
     windowDataManager = new WindowDataManager.NoopWindowDataManager();
-    currentWindowRecoveryState = new HashMap<String, KinesisPair<String, Integer>>();
+    currentWindowRecoveryState = new HashMap<String, MutablePair<String, Integer>>();
   }
   /**
    * Derived class has to implement this method, so that it knows what type of message it is going to send to Malhar.
@@ -464,17 +453,17 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
   {
     try {
       @SuppressWarnings("unchecked")
-      Map<String, KinesisPair<String, Integer>> recoveredData =
-          (Map<String, KinesisPair<String, Integer>>)windowDataManager.retrieve(windowId);
+      Map<String, MutablePair<String, Integer>> recoveredData =
+          (Map<String, MutablePair<String, Integer>>)windowDataManager.retrieve(windowId);
       if (recoveredData == null) {
         return;
       }
-      for (Map.Entry<String, KinesisPair<String, Integer>> rc: recoveredData.entrySet()) {
+      for (Map.Entry<String, MutablePair<String, Integer>> rc: recoveredData.entrySet()) {
         logger.debug("Replaying the windowId: {}", windowId);
-        logger.debug("ShardId: " + rc.getKey() + " , Start Sequence Id: " + rc.getValue().getFirst() + " , No Of Records: " + rc.getValue().getSecond());
+        logger.debug("ShardId: " + rc.getKey() + " , Start Sequence Id: " + rc.getValue().getLeft() + " , No Of Records: " + rc.getValue().getRight());
         try {
-          List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(),
-              rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst());
+          List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getRight(),
+              rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getLeft());
           for (Record record : records) {
             emitTuple(new Pair<String, Record>(rc.getKey(), record));
             shardPosition.put(rc.getKey(), record.getSequenceNumber());
@@ -581,13 +570,11 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
       String shardId = data.getFirst();
       String recordId = data.getSecond().getSequenceNumber();
       emitTuple(data);
-      if(!currentWindowRecoveryState.containsKey(shardId))
-      {
-        currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));
+      MutablePair<String, Integer> shardOffsetAndCount = currentWindowRecoveryState.get(shardId);
+      if (shardOffsetAndCount == null) {
+        currentWindowRecoveryState.put(shardId, new MutablePair<String, Integer>(recordId, 1));
       } else {
-        KinesisPair<String, Integer> second = currentWindowRecoveryState.get(shardId);
-        Integer noOfRecords = second.getSecond();
-        currentWindowRecoveryState.put(data.getFirst(), new KinesisPair<String, Integer>(second.getFirst(), noOfRecords+1));
+        shardOffsetAndCount.setRight(shardOffsetAndCount.right+1);
       }
       shardPosition.put(shardId, recordId);
     }