You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/12 20:57:37 UTC

incubator-apex-malhar git commit: Fixes for the following issues

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/release-3.3 dc0e1a038 -> b00ff80f4


Fixes for the following issues

Committed offsets are not present in offset manager storage.
Operator partitions are reporting offsets to stats listener for kafka partitions they don't subscribe to.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b00ff80f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b00ff80f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b00ff80f

Branch: refs/heads/release-3.3
Commit: b00ff80f4aa47010306513da9e0b2d6b73a2a494
Parents: dc0e1a0
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Feb 9 20:52:56 2016 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Feb 9 20:52:56 2016 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b00ff80f/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index c7bac18..671a76f 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -651,7 +651,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
     if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) {
       p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
       if (initOffsets != null) {
-        p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+        //Don't send all offsets to all partitions
+        //p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+        p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets());
       }
     }
     newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
@@ -723,7 +725,11 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
   {
     //In every partition check interval, call offsetmanager to update the offsets
     if (offsetManager != null) {
-      offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
+      Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats);
+      if (offsetsForPartitions.size() > 0) {
+        logger.debug("Passing offset updates to offset manager");
+        offsetManager.updateOffsets(offsetsForPartitions);
+      }
     }
   }
 
@@ -751,15 +757,18 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
 
     long t = System.currentTimeMillis();
 
+    // If stats are available then update offsets
+    // Do this before re-partition interval check below to not miss offset updates
+    if (kstats.size() > 0) {
+      logger.debug("Checking offset updates for offset manager");
+      updateOffsets(kstats);
+    }
+
     if (t - lastCheckTime < repartitionCheckInterval) {
       // return false if it's within repartitionCheckInterval since last time it check the stats
       return false;
     }
 
-    logger.debug("Use OffsetManager to update offsets");
-    updateOffsets(kstats);
-
-
     if(repartitionInterval < 0){
       // if repartition is disabled
       return false;