You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/12/02 15:12:13 UTC

flink git commit: [FLINK-5128] [kafka] Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set

Repository: flink
Updated Branches:
  refs/heads/master 3229dc07a -> 45b770b51


[FLINK-5128] [kafka] Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set

This closes #2893.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45b770b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45b770b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45b770b5

Branch: refs/heads/master
Commit: 45b770b517de509f4d8c058d57ae0e3e34f6a9dd
Parents: 3229dc0
Author: renkai <ga...@gmail.com>
Authored: Tue Nov 29 14:26:00 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Dec 2 23:09:58 2016 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducerBase.java           | 32 ++++++++++----------
 1 file changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45b770b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index d413f1c..679b731 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -212,24 +212,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	public void open(Configuration configuration) {
 		producer = getKafkaProducer(this.producerConfig);
 
-		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
-		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
-		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
-			@Override
-			public int compare(PartitionInfo o1, PartitionInfo o2) {
-				return Integer.compare(o1.partition(), o2.partition());
-			}
-		});
-
-		partitions = new int[partitionsList.size()];
-		for (int i = 0; i < partitions.length; i++) {
-			partitions[i] = partitionsList.get(i).partition();
-		}
-
 		RuntimeContext ctx = getRuntimeContext();
 		if (partitioner != null) {
+			// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+			List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+			// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+			Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+				@Override
+				public int compare(PartitionInfo o1, PartitionInfo o2) {
+					return Integer.compare(o1.partition(), o2.partition());
+				}
+			});
+
+			partitions = new int[partitionsList.size()];
+			for (int i = 0; i < partitions.length; i++) {
+				partitions[i] = partitionsList.get(i).partition();
+			}
+
 			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
 		}