You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/01 06:39:02 UTC

[GitHub] tzulitai commented on a change in pull request #6440: [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory

tzulitai commented on a change in pull request #6440: [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory
URL: https://github.com/apache/flink/pull/6440#discussion_r206768231
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 ##########
 @@ -314,9 +322,24 @@ private StartupOptions getStartupOptions(
 		return options;
 	}
 
-	private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
-		// we don't support custom partitioner so far
-		return new FlinkFixedPartitioner<>();
+	@SuppressWarnings("unchecked")
+	private Optional<FlinkKafkaPartitioner<Row>> getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) {
+		return descriptorProperties
+			.getOptionalString(CONNECTOR_SINK_PARTITIONER)
+			.flatMap((String partitionerString) -> {
+				switch (partitionerString) {
 
 Review comment:
   It seems like there is a missing case here: key-hash partitioning.
   
   That would be the default partitioning scheme used in the Kafka producer, if 1) a key exists for records (returned by the `KeyedDeserializationSchema`), and 2) there is no partitioner provided.
   
   Round-robin partitioning only takes place if there are no record keys.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services