You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/20 09:36:14 UTC

apex-malhar git commit: APEXMALHAR-2411 Avoid isreplaystate variable, incorporate logic in activate() and replay() for Kinesis Input Operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 22c65c4c0 -> 6c42103f8


APEXMALHAR-2411 Avoid isreplaystate variable, incorporate logic in activate() and replay() for Kinesis Input Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6c42103f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6c42103f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6c42103f

Branch: refs/heads/master
Commit: 6c42103f8a9721649d2c6ce3905cffc8baf61fb9
Parents: 22c65c4
Author: deepak-narkhede <ma...@gmail.com>
Authored: Fri Feb 17 11:43:05 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Mon Feb 20 14:24:32 2017 +0530

----------------------------------------------------------------------
 .../kinesis/AbstractKinesisInputOperator.java   | 33 ++++++++++----------
 1 file changed, 16 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6c42103f/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index fc10bea..18a6399 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -56,6 +56,7 @@ import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
@@ -128,8 +129,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
 
   private transient long lastRepartitionTime = 0L;
 
-  private transient boolean isReplayState = false;
-
   //No of shards per partition in dynamic MANY_TO_ONE strategy
   // If the value is more than 1, then it enables the dynamic partitioning
   @Min(1)
@@ -425,9 +424,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
     operatorId = context.getId();
     windowDataManager.setup(context);
     shardPosition.clear();
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
-      isReplayState = true;
-    }
   }
 
   /**
@@ -477,6 +473,18 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
           throw new RuntimeException(e);
         }
       }
+
+      /*
+       * Set the shard positions and start the consumer if last recovery windowid
+       * match with current completed windowid.
+       */
+      if (windowId == windowDataManager.getLargestCompletedWindow()) {
+        // Set the shard positions to the consumer
+        Map<String, String> statsData = new HashMap<String, String>(getConsumer().getShardPosition());
+        statsData.putAll(shardPosition);
+        getConsumer().resetShardPositions(statsData);
+        consumer.start();
+      }
     }
     catch (IOException e) {
       throw new RuntimeException("replay", e);
@@ -507,9 +515,9 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
   @Override
   public void activate(OperatorContext ctx)
   {
-    if(isReplayState)
-    {
-      // If it is a replay state, don't start the consumer
+    // If it is a replay state, don't start the consumer
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID &&
+      context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
       return;
     }
     consumer.start();
@@ -573,15 +581,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
       }
       shardPosition.put(shardId, recordId);
     }
-    if(isReplayState)
-    {
-      isReplayState = false;
-      // Set the shard positions to the consumer
-      Map<String, String> statsData = new HashMap<String, String>(getConsumer().getShardPosition());
-      statsData.putAll(shardPosition);
-      getConsumer().resetShardPositions(statsData);
-      consumer.start();
-    }
     emitCount += count;
   }