You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/12/02 15:12:13 UTC
flink git commit: [FLINK-5128] [kafka] Get Kafka partitions in
FlinkKafkaProducer only if a partitioner is set
Repository: flink
Updated Branches:
refs/heads/master 3229dc07a -> 45b770b51
[FLINK-5128] [kafka] Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set
This closes #2893.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45b770b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45b770b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45b770b5
Branch: refs/heads/master
Commit: 45b770b517de509f4d8c058d57ae0e3e34f6a9dd
Parents: 3229dc0
Author: renkai <ga...@gmail.com>
Authored: Tue Nov 29 14:26:00 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Dec 2 23:09:58 2016 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducerBase.java | 32 ++++++++++----------
1 file changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/45b770b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index d413f1c..679b731 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -212,24 +212,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
public void open(Configuration configuration) {
producer = getKafkaProducer(this.producerConfig);
- // the fetched list is immutable, so we're creating a mutable copy in order to sort it
- List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
- // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
- Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
- @Override
- public int compare(PartitionInfo o1, PartitionInfo o2) {
- return Integer.compare(o1.partition(), o2.partition());
- }
- });
-
- partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
- }
-
RuntimeContext ctx = getRuntimeContext();
if (partitioner != null) {
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+ Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
}