You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/01 14:26:26 UTC

[GitHub] asfgit closed pull request #6440: [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory

asfgit closed pull request #6440: [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory
URL: https://github.com/apache/flink/pull/6440
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index 2ad31420789..8471908a9cf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -73,16 +74,23 @@ public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPart
 	}
 
 	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer010<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 
 	@Override
 	protected Kafka010JsonTableSink createCopy() {
-		return new Kafka010JsonTableSink(topic, properties, partitioner);
+		return new Kafka010JsonTableSink(
+			topic,
+			properties,
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
index a8c65539824..1d408b8ab52 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public Kafka010TableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -51,11 +52,11 @@ public Kafka010TableSink(
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer010<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(null));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
index 0cf94995bdd..ecf12b27a08 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka010TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index 339420cede3..9208f6583b8 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
 
-		return new Kafka010JsonTableSink(topic, properties, partitioner);
+		return new Kafka010JsonTableSink(
+			topic,
+			properties,
+			partitioner);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
index cc198c91595..dac8a4dacdd 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka010TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index 22c6da13b05..8d81a5b59a1 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -39,7 +39,7 @@ public Kafka011TableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -54,11 +54,11 @@ public Kafka011TableSink(
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer011<>(
 			topic,
 			new KeyedSerializationSchemaWrapper<>(serializationSchema),
 			properties,
-			Optional.of(partitioner));
+			partitioner);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
index c26df42ed4a..e6f677fb568 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka011TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
index 996c5083860..f4614761d21 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka011TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 45588cdb141..189a9fdf46b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -26,6 +26,7 @@
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -92,17 +93,24 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione
 	}
 
 	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer08<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 
 	@Override
 	protected Kafka08JsonTableSink createCopy() {
-		return new Kafka08JsonTableSink(topic, properties, partitioner);
+		return new Kafka08JsonTableSink(
+			topic,
+			properties,
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 }
 
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
index c34de13efde..146cfc90739 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public Kafka08TableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -51,11 +52,11 @@ public Kafka08TableSink(
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer08<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(null));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
index 3e93b6fdeac..aeccd4f1ac3 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka08TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 32bd3b69c06..fc46ad4c6ee 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
 
-		return new Kafka08JsonTableSink(topic, properties, partitioner);
+		return new Kafka08JsonTableSink(
+			topic,
+			properties,
+			partitioner);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
index b67501e449e..ff633ec0246 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka08TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b3cc0aa77ca..33634590061 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -26,6 +26,7 @@
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -92,16 +93,23 @@ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitione
 	}
 
 	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer09<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 
 	@Override
 	protected Kafka09JsonTableSink createCopy() {
-		return new Kafka09JsonTableSink(topic, properties, partitioner);
+		return new Kafka09JsonTableSink(
+			topic,
+			properties,
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
index 8c349d7a0b2..6e38aad1a39 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public Kafka09TableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -51,11 +52,11 @@ public Kafka09TableSink(
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer09<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(null));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
index 9958b4ef316..19f51508b93 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka09TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 79f251b8302..97b5c7d88a2 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
 
-		return new Kafka09JsonTableSink(topic, properties, partitioner);
+		return new Kafka09JsonTableSink(
+			topic,
+			properties,
+			partitioner);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
index a6c8bd4b279..d54c3945949 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka09TableSink(
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 7853bb702a5..a85d536eac9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -40,7 +40,7 @@
  * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
  */
 @Internal
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@@ -60,7 +60,7 @@
 	protected Optional<SerializationSchema<Row>> serializationSchema;
 
 	/** Partitioner to select Kafka partition for each item. */
-	protected final FlinkKafkaPartitioner<Row> partitioner;
+	protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
 
 	// legacy variables
 	protected String[] fieldNames;
@@ -70,7 +70,7 @@ protected KafkaTableSink(
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
 		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
@@ -96,7 +96,7 @@ public KafkaTableSink(
 		this.schema = Optional.empty();
 		this.topic = Preconditions.checkNotNull(topic, "topic");
 		this.properties = Preconditions.checkNotNull(properties, "properties");
-		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+		this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
 		this.serializationSchema = Optional.empty();
 	}
 
@@ -113,7 +113,7 @@ public KafkaTableSink(
 		String topic,
 		Properties properties,
 		SerializationSchema<Row> serializationSchema,
-		FlinkKafkaPartitioner<Row> partitioner);
+		Optional<FlinkKafkaPartitioner<Row>> partitioner);
 
 	/**
 	 * Create serialization schema for converting table rows into bytes.
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 27b2e67ce0b..5634331adbb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -38,6 +38,7 @@
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,6 +55,11 @@
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -105,6 +111,8 @@
 		properties.add(CONNECTOR_STARTUP_MODE);
 		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
 		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+		properties.add(CONNECTOR_SINK_PARTITIONER);
+		properties.add(CONNECTOR_SINK_PARTITIONER_CLASS);
 
 		// schema
 		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
@@ -170,7 +178,7 @@
 			schema,
 			topic,
 			getKafkaProperties(descriptorProperties),
-			getFlinkKafkaPartitioner(),
+			getFlinkKafkaPartitioner(descriptorProperties),
 			getSerializationSchema(properties));
 	}
 
@@ -228,7 +236,7 @@ protected abstract KafkaTableSink createKafkaTableSink(
 		TableSchema schema,
 		String topic,
 		Properties properties,
-		FlinkKafkaPartitioner<Row> partitioner,
+		Optional<FlinkKafkaPartitioner<Row>> partitioner,
 		SerializationSchema<Row> serializationSchema);
 
 	// --------------------------------------------------------------------------------------------
@@ -314,9 +322,24 @@ private StartupOptions getStartupOptions(
 		return options;
 	}
 
-	private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
-		// we don't support custom partitioner so far
-		return new FlinkFixedPartitioner<>();
+	@SuppressWarnings("unchecked")
+	private Optional<FlinkKafkaPartitioner<Row>> getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) {
+		return descriptorProperties
+			.getOptionalString(CONNECTOR_SINK_PARTITIONER)
+			.flatMap((String partitionerString) -> {
+				switch (partitionerString) {
+					case CONNECTOR_SINK_PARTITIONER_VALUE_FIXED:
+						return Optional.of(new FlinkFixedPartitioner<>());
+					case CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+						return Optional.empty();
+					case CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM:
+						final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+							descriptorProperties.getClass(CONNECTOR_SINK_PARTITIONER_CLASS, FlinkKafkaPartitioner.class);
+						return Optional.of(InstantiationUtil.instantiate(partitionerClass));
+					default:
+						throw new TableException("Unsupported sink partitioner. Validator should have checked that.");
+				}
+			});
 	}
 
 	private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
index 45359587c1c..e44341a991e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -34,6 +35,11 @@
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -51,6 +57,8 @@
 	private StartupMode startupMode;
 	private Map<Integer, Long> specificOffsets;
 	private Map<String, String> kafkaProperties;
+	private String sinkPartitionerType;
+	private Class<? extends FlinkKafkaPartitioner> sinkPartitionerClass;
 
 	/**
 	 * Connector descriptor for the Apache Kafka message queue.
@@ -175,6 +183,69 @@ public Kafka startFromSpecificOffset(int partition, long specificOffset) {
 		return this;
 	}
 
+	/**
+	 * Configures how to partition records from Flink's partitions into Kafka's partitions.
+	 *
+	 * <p>This strategy ensures that each Flink partition ends up in one Kafka partition.
+	 *
+	 * <p>Note: One Kafka partition can contain multiple Flink partitions. Examples:
+	 *
+	 * <p>More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
+	 * the output of more than one flink partition:
+	 * <pre>
+	 *     Flink Sinks            Kafka Partitions
+	 *         1    ----------------&gt;    1
+	 *         2    --------------/
+	 *         3    -------------/
+	 *         4    ------------/
+	 * </pre>
+	 *
+	 *
+	 * <p>Fewer Flink partitions than Kafka partitions:
+	 * <pre>
+	 *     Flink Sinks            Kafka Partitions
+	 *         1    ----------------&gt;    1
+	 *         2    ----------------&gt;    2
+	 *                                      3
+	 *                                      4
+	 *                                      5
+	 * </pre>
+	 *
+	 * @see org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
+	 */
+	public Kafka sinkPartitionerFixed() {
+		sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+		sinkPartitionerClass = null;
+		return this;
+	}
+
+	/**
+	 * Configures how to partition records from Flink's partitions into Kafka's partitions.
+	 *
+	 * <p>This strategy ensures that records will be distributed to Kafka partitions in a
+	 * round-robin fashion.
+	 *
+	 * <p>Note: This strategy is useful to avoid an unbalanced partitioning. However, it will
+	 * cause a lot of network connections between all the Flink instances and all the Kafka brokers.
+	 */
+	public Kafka sinkPartitionerRoundRobin() {
+		sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
+		sinkPartitionerClass = null;
+		return this;
+	}
+
+	/**
+	 * Configures how to partition records from Flink's partitions into Kafka's partitions.
+	 *
+	 * <p>This strategy allows for a custom partitioner by providing an implementation
+	 * of {@link FlinkKafkaPartitioner}.
+	 */
+	public Kafka sinkPartitionerCustom(Class<? extends FlinkKafkaPartitioner> partitionerClass) {
+		sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+		sinkPartitionerClass = Preconditions.checkNotNull(partitionerClass);
+		return this;
+	}
+
 	/**
 	 * Internal method for connector properties conversion.
 	 */
@@ -212,5 +283,12 @@ public void addConnectorProperties(DescriptorProperties properties) {
 					.collect(Collectors.toList())
 				);
 		}
+
+		if (sinkPartitionerType != null) {
+			properties.putString(CONNECTOR_SINK_PARTITIONER, sinkPartitionerType);
+			if (sinkPartitionerClass != null) {
+				properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass);
+			}
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index 3adc7c518a4..cad37f8f8cd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -48,12 +48,27 @@
 	public static final String CONNECTOR_PROPERTIES = "connector.properties";
 	public static final String CONNECTOR_PROPERTIES_KEY = "key";
 	public static final String CONNECTOR_PROPERTIES_VALUE = "value";
+	public static final String CONNECTOR_SINK_PARTITIONER = "connector.sink-partitioner";
+	public static final String CONNECTOR_SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+	public static final String CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM = "custom";
+	public static final String CONNECTOR_SINK_PARTITIONER_CLASS = "connector.sink-partitioner-class";
 
 	@Override
 	public void validate(DescriptorProperties properties) {
 		super.validate(properties);
 		properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA, false);
 
+		validateVersion(properties);
+
+		validateStartupMode(properties);
+
+		validateKafkaProperties(properties);
+
+		validateSinkPartitioner(properties);
+	}
+
+	private void validateVersion(DescriptorProperties properties) {
 		final List<String> versions = Arrays.asList(
 			CONNECTOR_VERSION_VALUE_08,
 			CONNECTOR_VERSION_VALUE_09,
@@ -61,7 +76,9 @@ public void validate(DescriptorProperties properties) {
 			CONNECTOR_VERSION_VALUE_011);
 		properties.validateEnumValues(CONNECTOR_VERSION(), false, versions);
 		properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);
+	}
 
+	private void validateStartupMode(DescriptorProperties properties) {
 		final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>();
 		specificOffsetValidators.put(
 			CONNECTOR_SPECIFIC_OFFSETS_PARTITION,
@@ -86,17 +103,29 @@ public void validate(DescriptorProperties properties) {
 			CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
 			prefix -> properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, specificOffsetValidators));
 		properties.validateEnum(CONNECTOR_STARTUP_MODE, true, startupModeValidation);
+	}
 
+	private void validateKafkaProperties(DescriptorProperties properties) {
 		final Map<String, Consumer<String>> propertyValidators = new HashMap<>();
 		propertyValidators.put(
 			CONNECTOR_PROPERTIES_KEY,
-			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE));
+			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1));
 		propertyValidators.put(
 			CONNECTOR_PROPERTIES_VALUE,
-			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE));
+			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0));
 		properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, true, propertyValidators);
 	}
 
+	private void validateSinkPartitioner(DescriptorProperties properties) {
+		final Map<String, Consumer<String>> sinkPartitionerValidators = new HashMap<>();
+		sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_FIXED, properties.noValidation());
+		sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN, properties.noValidation());
+		sinkPartitionerValidators.put(
+			CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM,
+			prefix -> properties.validateString(CONNECTOR_SINK_PARTITIONER_CLASS, false, 1));
+		properties.validateEnum(CONNECTOR_SINK_PARTITIONER, true, sinkPartitionerValidators);
+	}
+
 	// utilities
 
 	public static String normalizeStartupMode(StartupMode startupMode) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 946b6eb5895..b4bb89dc048 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -30,6 +30,7 @@
 
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -59,7 +60,7 @@
 
 	@SuppressWarnings("unchecked")
 	@Test
-	public void testKafkaTableSink() throws Exception {
+	public void testKafkaTableSink() {
 		DataStream dataStream = mock(DataStream.class);
 		when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
 
@@ -74,7 +75,7 @@ public void testKafkaTableSink() throws Exception {
 			eq(TOPIC),
 			eq(PROPERTIES),
 			any(getSerializationSchemaClass()),
-			eq(PARTITIONER));
+			eq(Optional.of(PARTITIONER)));
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 504bed16a74..5e9144c1e57 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -153,6 +153,7 @@ public void testTableSource() {
 					.version(getKafkaVersion())
 					.topic(TOPIC)
 					.properties(KAFKA_PROPERTIES)
+					.sinkPartitionerRoundRobin() // test if accepted although not needed
 					.startFromSpecificOffsets(OFFSETS))
 			.withFormat(new TestTableFormat())
 			.withSchema(
@@ -194,7 +195,7 @@ public void testTableSink() {
 			schema,
 			TOPIC,
 			KAFKA_PROPERTIES,
-			new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet
+			Optional.of(new FlinkFixedPartitioner<>()),
 			new TestSerializationSchema(schema.toRowType()));
 
 		// construct table sink using descriptors and table sink factory
@@ -204,6 +205,7 @@ public void testTableSink() {
 					.version(getKafkaVersion())
 					.topic(TOPIC)
 					.properties(KAFKA_PROPERTIES)
+					.sinkPartitionerFixed()
 					.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
 			.withFormat(new TestTableFormat())
 			.withSchema(
@@ -299,6 +301,6 @@ protected abstract KafkaTableSink getExpectedKafkaTableSink(
 		TableSchema schema,
 		String topic,
 		Properties properties,
-		FlinkKafkaPartitioner<Row> partitioner,
+		Optional<FlinkKafkaPartitioner<Row>> partitioner,
 		SerializationSchema<Row> serializationSchema);
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
index f3d96f1c443..c67bc4dcf20 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.descriptors;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -59,7 +61,8 @@
 				.version("0.11")
 				.topic("MyTable")
 				.startFromSpecificOffsets(offsets)
-				.properties(properties);
+				.properties(properties)
+				.sinkPartitionerCustom(FlinkFixedPartitioner.class);
 
 		return Arrays.asList(earliestDesc, specificOffsetsDesc, specificOffsetsMapDesc);
 	}
@@ -102,6 +105,8 @@
 		props3.put("connector.properties.0.value", "12");
 		props3.put("connector.properties.1.key", "kafka.stuff");
 		props3.put("connector.properties.1.value", "42");
+		props3.put("connector.sink-partitioner", "custom");
+		props3.put("connector.sink-partitioner-class", FlinkFixedPartitioner.class.getName());
 
 		return Arrays.asList(props1, props2, props3);
 	}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services