You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/08/23 17:20:31 UTC

[2/3] apex-malhar git commit: APEXMALHAR-2180 Kafka partitions to be unchanged in the case of dynamic scalling of ONE_TO_MANY strategy

APEXMALHAR-2180 Kafka partitions to be unchanged in the case of dynamic scalling of ONE_TO_MANY strategy


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37fa01e7
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37fa01e7
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37fa01e7

Branch: refs/heads/master
Commit: 37fa01e721da2fc1067f8e762ac703a0eff48ea2
Parents: e428548
Author: chaitanya <ch...@apache.org>
Authored: Tue Aug 23 00:07:51 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Tue Aug 23 00:07:51 2016 +0530

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 52 +++++++++++---------
 1 file changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37fa01e7/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 9d2e664..21ff181 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -21,8 +21,8 @@ package com.datatorrent.contrib.kafka;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
@@ -30,8 +30,6 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -59,8 +57,6 @@ import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -70,7 +66,6 @@ import java.util.Map;
 import java.util.Set;
 
 import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
-import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap;
 
 /**
  * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.&nbsp;
@@ -129,7 +124,7 @@ import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.ge
  */
 
 @OperatorAnnotation(partitionable = true)
-public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
+public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
 
@@ -375,6 +370,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+
+  }
+
+  @Override
   public void committed(long windowId)
   {
     if ((getConsumer() instanceof  SimpleKafkaConsumer)) {
@@ -510,11 +511,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
       isInitialParitition = partitions.iterator().next().getStats() == null;
     }
 
-    // get partition metadata for topics.
-    // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
-    // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
-    Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
-
     // Operator partitions
     List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null;
 
@@ -537,6 +533,10 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
       if (isInitialParitition) {
         lastRepartitionTime = System.currentTimeMillis();
         logger.info("[ONE_TO_ONE]: Initializing partition(s)");
+        // get partition metadata for topics.
+        // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
+        // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
+        Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
 
         // initialize the number of operator partitions according to number of kafka partitions
 
@@ -548,7 +548,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
             newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers));
           }
         }
-
+        windowDataManager.partitioned(newManagers, deletedOperators);
+        return newPartitions;
       }
       else if (newWaitingPartition.size() != 0) {
         // add partition for new kafka partition
@@ -557,9 +558,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
           partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers));
         }
         newWaitingPartition.clear();
-        windowDataManager.partitioned(newManagers, deletedOperators);
-        return partitions;
-
       }
       break;
     // For the 1 to N mapping The initial partition number is defined by stream application
@@ -571,9 +569,14 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
         throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
       }
 
-      if (isInitialParitition) {
+      if (isInitialParitition || newWaitingPartition.size() != 0) {
         lastRepartitionTime = System.currentTimeMillis();
         logger.info("[ONE_TO_MANY]: Initializing partition(s)");
+        // get partition metadata for topics.
+        // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
+        // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
+        Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
+
         int size = initialPartitionCount;
         @SuppressWarnings("unchecked")
         Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size);
@@ -594,14 +597,15 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
           logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", "));
           newPartitions.add(createPartition(kps[i], initOffset, newManagers));
         }
+        // Add the existing partition Ids to the deleted operators
+        for (Partition<AbstractKafkaInputOperator<K>> op : partitions)
+        {
+          deletedOperators.add(op.getPartitionedInstance().operatorId);
+        }
 
-      }
-      else if (newWaitingPartition.size() != 0) {
-
-        logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", "));
-        partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers));
+        newWaitingPartition.clear();
         windowDataManager.partitioned(newManagers, deletedOperators);
-        return partitions;
+        return newPartitions;
       }
       break;
 
@@ -612,7 +616,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
     }
 
     windowDataManager.partitioned(newManagers, deletedOperators);
-    return newPartitions;
+    return partitions;
   }
 
   // Create a new partition with the partition Ids and initial offset positions