You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/08/23 17:20:31 UTC
[2/3] apex-malhar git commit: APEXMALHAR-2180 Kafka partitions to be
unchanged in the case of dynamic scalling of ONE_TO_MANY strategy
APEXMALHAR-2180 Kafka partitions to be unchanged in the case of dynamic scalling of ONE_TO_MANY strategy
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37fa01e7
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37fa01e7
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37fa01e7
Branch: refs/heads/master
Commit: 37fa01e721da2fc1067f8e762ac703a0eff48ea2
Parents: e428548
Author: chaitanya <ch...@apache.org>
Authored: Tue Aug 23 00:07:51 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Tue Aug 23 00:07:51 2016 +0530
----------------------------------------------------------------------
.../kafka/AbstractKafkaInputOperator.java | 52 +++++++++++---------
1 file changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37fa01e7/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 9d2e664..21ff181 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -21,8 +21,8 @@ package com.datatorrent.contrib.kafka;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
@@ -30,8 +30,6 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -59,8 +57,6 @@ import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -70,7 +66,6 @@ import java.util.Map;
import java.util.Set;
import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
-import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap;
/**
* This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.
@@ -129,7 +124,7 @@ import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.ge
*/
@OperatorAnnotation(partitionable = true)
-public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
+public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
{
private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
@@ -375,6 +370,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
}
@Override
+ public void beforeCheckpoint(long windowId)
+ {
+
+ }
+
+ @Override
public void committed(long windowId)
{
if ((getConsumer() instanceof SimpleKafkaConsumer)) {
@@ -510,11 +511,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
isInitialParitition = partitions.iterator().next().getStats() == null;
}
- // get partition metadata for topics.
- // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
- // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
- Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
-
// Operator partitions
List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null;
@@ -537,6 +533,10 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
if (isInitialParitition) {
lastRepartitionTime = System.currentTimeMillis();
logger.info("[ONE_TO_ONE]: Initializing partition(s)");
+ // get partition metadata for topics.
+ // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
+ // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
+ Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
// initialize the number of operator partitions according to number of kafka partitions
@@ -548,7 +548,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers));
}
}
-
+ windowDataManager.partitioned(newManagers, deletedOperators);
+ return newPartitions;
}
else if (newWaitingPartition.size() != 0) {
// add partition for new kafka partition
@@ -557,9 +558,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers));
}
newWaitingPartition.clear();
- windowDataManager.partitioned(newManagers, deletedOperators);
- return partitions;
-
}
break;
// For the 1 to N mapping The initial partition number is defined by stream application
@@ -571,9 +569,14 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
}
- if (isInitialParitition) {
+ if (isInitialParitition || newWaitingPartition.size() != 0) {
lastRepartitionTime = System.currentTimeMillis();
logger.info("[ONE_TO_MANY]: Initializing partition(s)");
+ // get partition metadata for topics.
+ // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
+ // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
+ Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
+
int size = initialPartitionCount;
@SuppressWarnings("unchecked")
Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size);
@@ -594,14 +597,15 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", "));
newPartitions.add(createPartition(kps[i], initOffset, newManagers));
}
+ // Add the existing partition Ids to the deleted operators
+ for (Partition<AbstractKafkaInputOperator<K>> op : partitions)
+ {
+ deletedOperators.add(op.getPartitionedInstance().operatorId);
+ }
- }
- else if (newWaitingPartition.size() != 0) {
-
- logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", "));
- partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers));
+ newWaitingPartition.clear();
windowDataManager.partitioned(newManagers, deletedOperators);
- return partitions;
+ return newPartitions;
}
break;
@@ -612,7 +616,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
}
windowDataManager.partitioned(newManagers, deletedOperators);
- return newPartitions;
+ return partitions;
}
// Create a new partition with the partition Ids and initial offset positions