You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/07 08:47:30 UTC

[GitHub] [flink] wuchong commented on a change in pull request #14246: [FLINK-20273][table/kafka] Fix the Kafka round-robin behaviour when k…

wuchong commented on a change in pull request #14246:
URL: https://github.com/apache/flink/pull/14246#discussion_r537312778



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -314,12 +317,12 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) {
 	private static void validateSinkPartitioner(ReadableConfig tableOptions) {
 		tableOptions.getOptional(SINK_PARTITIONER)
 				.ifPresent(partitioner -> {
-					if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
-						if (partitioner.isEmpty()) {
-							throw new ValidationException(
-									String.format("Option '%s' should be a non-empty string.",
-											SINK_PARTITIONER.key()));
-						}
+					if (partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) && tableOptions.getOptional(KEY_FIELDS).isPresent()) {

Review comment:
       We should still need to check the partitioner is in the allowed enums. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -525,9 +527,9 @@ See more about how to use the CDC formats in [debezium-json]({% link dev/table/c
 ### Sink Partitioning
 
 The config option `sink.partitioner` specifies output partitioning from Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition).
-In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The `round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java) to parititon records.
+It uses the [sticky partition strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/) for records with null keys and uses a murmur2 hash to compute the partition for a record with the key defined.
+In order to control the routing of rows into partitions, a custom sink partitioner can be provided. The 'fixed' partitioner will write the records in the same Flink partition into the same partition, which could reduce the cost of the network connections.

Review comment:
       ```suggestion
   By default, Flink uses the [Kafka default partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java) to parititon records. It uses the [sticky partition strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/) for records with null keys and uses a murmur2 hash to compute the partition for a record with the key defined.
   
   In order to control the routing of rows into partitions, a custom sink partitioner can be provided. The 'fixed' partitioner will write the records in the same Flink partition into the same Kafka partition, which could reduce the cost of the network connections.
   ```

##########
File path: docs/dev/table/connectors/kafka.zh.md
##########
@@ -526,9 +528,9 @@ See more about how to use the CDC formats in [debezium-json]({% link dev/table/c
 ### Sink Partitioning
 
 The config option `sink.partitioner` specifies output partitioning from Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition).
-In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The `round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java) to parititon records.
+It uses the [sticky partition strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/) for records with null keys and uses a murmur2 hash to compute the partition for a record with the key defined.
+In order to control the routing of rows into partitions, a custom sink partitioner can be provided. The 'fixed' partitioner will write the records in the same Flink partition into the same partition, which could reduce the cost of the network connections.

Review comment:
       ```suggestion
   By default, Flink uses the [Kafka default partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java) to parititon records. It uses the [sticky partition strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/) for records with null keys and uses a murmur2 hash to compute the partition for a record with the key defined.
   
   In order to control the routing of rows into partitions, a custom sink partitioner can be provided. The 'fixed' partitioner will write the records in the same Flink partition into the same partition, which could reduce the cost of the network connections.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org