You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/01 14:25:01 UTC

[flink] branch master updated: [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 628b71d  [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory
628b71d is described below

commit 628b71dfabe72a99eb6d54994fc870abed1f0268
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jul 27 14:03:50 2018 +0200

    [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory
    
    Adds the possibility to add a FlinkKafkaPartitioner to a Kafka table sink
    factory. It povides shortcuts for the built-in "fixed" and "round-robin" partitioning.
    
    This closes #6440.
---
 .../connectors/kafka/Kafka010JsonTableSink.java    | 14 +++-
 .../connectors/kafka/Kafka010TableSink.java        |  7 +-
 .../kafka/Kafka010TableSourceSinkFactory.java      |  2 +-
 .../kafka/Kafka010JsonTableSinkTest.java           |  5 +-
 .../kafka/Kafka010TableSourceSinkFactoryTest.java  |  2 +-
 .../connectors/kafka/Kafka011TableSink.java        |  6 +-
 .../kafka/Kafka011TableSourceSinkFactory.java      |  2 +-
 .../kafka/Kafka011TableSourceSinkFactoryTest.java  |  2 +-
 .../connectors/kafka/Kafka08JsonTableSink.java     | 14 +++-
 .../connectors/kafka/Kafka08TableSink.java         |  7 +-
 .../kafka/Kafka08TableSourceSinkFactory.java       |  2 +-
 .../connectors/kafka/Kafka08JsonTableSinkTest.java |  5 +-
 .../kafka/Kafka08TableSourceSinkFactoryTest.java   |  2 +-
 .../connectors/kafka/Kafka09JsonTableSink.java     | 14 +++-
 .../connectors/kafka/Kafka09TableSink.java         |  7 +-
 .../kafka/Kafka09TableSourceSinkFactory.java       |  2 +-
 .../connectors/kafka/Kafka09JsonTableSinkTest.java |  5 +-
 .../kafka/Kafka09TableSourceSinkFactoryTest.java   |  2 +-
 .../streaming/connectors/kafka/KafkaTableSink.java | 10 +--
 .../kafka/KafkaTableSourceSinkFactoryBase.java     | 33 +++++++--
 .../org/apache/flink/table/descriptors/Kafka.java  | 78 ++++++++++++++++++++++
 .../flink/table/descriptors/KafkaValidator.java    | 33 ++++++++-
 .../connectors/kafka/KafkaTableSinkTestBase.java   |  5 +-
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java |  6 +-
 .../apache/flink/table/descriptors/KafkaTest.java  |  7 +-
 25 files changed, 223 insertions(+), 49 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index 2ad3142..8471908 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -73,16 +74,23 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer010<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 
 	@Override
 	protected Kafka010JsonTableSink createCopy() {
-		return new Kafka010JsonTableSink(topic, properties, partitioner);
+		return new Kafka010JsonTableSink(
+			topic,
+			properties,
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
index a8c6553..1d408b8 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public class Kafka010TableSink extends KafkaTableSink {
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -51,11 +52,11 @@ public class Kafka010TableSink extends KafkaTableSink {
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer010<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(null));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
index 0cf9499..ecf12b2 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka010TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index 339420c..9208f65 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -40,7 +40,10 @@ public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
 
-		return new Kafka010JsonTableSink(topic, properties, partitioner);
+		return new Kafka010JsonTableSink(
+			topic,
+			properties,
+			partitioner);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
index cc198c9..dac8a4d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka010TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index 22c6da1..8d81a5b 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -39,7 +39,7 @@ public class Kafka011TableSink extends KafkaTableSink {
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -54,11 +54,11 @@ public class Kafka011TableSink extends KafkaTableSink {
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer011<>(
 			topic,
 			new KeyedSerializationSchemaWrapper<>(serializationSchema),
 			properties,
-			Optional.of(partitioner));
+			partitioner);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
index c26df42..e6f677f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka011TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
index 996c508..f461476 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka011TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 45588cd..189a9fd 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -92,17 +93,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer08<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 
 	@Override
 	protected Kafka08JsonTableSink createCopy() {
-		return new Kafka08JsonTableSink(topic, properties, partitioner);
+		return new Kafka08JsonTableSink(
+			topic,
+			properties,
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 }
 
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
index c34de13..146cfc9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public class Kafka08TableSink extends KafkaTableSink {
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -51,11 +52,11 @@ public class Kafka08TableSink extends KafkaTableSink {
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer08<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(null));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
index 3e93b6f..aeccd4f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka08TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 32bd3b6..fc46ad4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -40,7 +40,10 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
 
-		return new Kafka08JsonTableSink(topic, properties, partitioner);
+		return new Kafka08JsonTableSink(
+			topic,
+			properties,
+			partitioner);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
index b67501e..ff633ec 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka08TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b3cc0aa..3363459 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -92,16 +93,23 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer09<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 
 	@Override
 	protected Kafka09JsonTableSink createCopy() {
-		return new Kafka09JsonTableSink(topic, properties, partitioner);
+		return new Kafka09JsonTableSink(
+			topic,
+			properties,
+			partitioner.orElse(new FlinkFixedPartitioner<>()));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
index 8c349d7..6e38aad 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public class Kafka09TableSink extends KafkaTableSink {
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		super(
 			schema,
@@ -51,11 +52,11 @@ public class Kafka09TableSink extends KafkaTableSink {
 			String topic,
 			Properties properties,
 			SerializationSchema<Row> serializationSchema,
-			FlinkKafkaPartitioner<Row> partitioner) {
+			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer09<>(
 			topic,
 			serializationSchema,
 			properties,
-			partitioner);
+			partitioner.orElse(null));
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
index 9958b4e..19f5150 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka09TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 79f251b..97b5c7d 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -40,7 +40,10 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
 
-		return new Kafka09JsonTableSink(topic, properties, partitioner);
+		return new Kafka09JsonTableSink(
+			topic,
+			properties,
+			partitioner);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
index a6c8bd4..d54c394 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 
 		return new Kafka09TableSink(
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 7853bb7..a85d536 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -40,7 +40,7 @@ import java.util.Properties;
  * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
  */
 @Internal
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@@ -60,7 +60,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	protected Optional<SerializationSchema<Row>> serializationSchema;
 
 	/** Partitioner to select Kafka partition for each item. */
-	protected final FlinkKafkaPartitioner<Row> partitioner;
+	protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
 
 	// legacy variables
 	protected String[] fieldNames;
@@ -70,7 +70,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 			TableSchema schema,
 			String topic,
 			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner,
+			Optional<FlinkKafkaPartitioner<Row>> partitioner,
 			SerializationSchema<Row> serializationSchema) {
 		this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
 		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
@@ -96,7 +96,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 		this.schema = Optional.empty();
 		this.topic = Preconditions.checkNotNull(topic, "topic");
 		this.properties = Preconditions.checkNotNull(properties, "properties");
-		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+		this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
 		this.serializationSchema = Optional.empty();
 	}
 
@@ -113,7 +113,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 		String topic,
 		Properties properties,
 		SerializationSchema<Row> serializationSchema,
-		FlinkKafkaPartitioner<Row> partitioner);
+		Optional<FlinkKafkaPartitioner<Row>> partitioner);
 
 	/**
 	 * Create serialization schema for converting table rows into bytes.
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 27b2e67..5634331 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,6 +55,11 @@ import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMA
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -105,6 +111,8 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 		properties.add(CONNECTOR_STARTUP_MODE);
 		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
 		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+		properties.add(CONNECTOR_SINK_PARTITIONER);
+		properties.add(CONNECTOR_SINK_PARTITIONER_CLASS);
 
 		// schema
 		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
@@ -170,7 +178,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 			schema,
 			topic,
 			getKafkaProperties(descriptorProperties),
-			getFlinkKafkaPartitioner(),
+			getFlinkKafkaPartitioner(descriptorProperties),
 			getSerializationSchema(properties));
 	}
 
@@ -228,7 +236,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 		TableSchema schema,
 		String topic,
 		Properties properties,
-		FlinkKafkaPartitioner<Row> partitioner,
+		Optional<FlinkKafkaPartitioner<Row>> partitioner,
 		SerializationSchema<Row> serializationSchema);
 
 	// --------------------------------------------------------------------------------------------
@@ -314,9 +322,24 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 		return options;
 	}
 
-	private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
-		// we don't support custom partitioner so far
-		return new FlinkFixedPartitioner<>();
+	@SuppressWarnings("unchecked")
+	private Optional<FlinkKafkaPartitioner<Row>> getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) {
+		return descriptorProperties
+			.getOptionalString(CONNECTOR_SINK_PARTITIONER)
+			.flatMap((String partitionerString) -> {
+				switch (partitionerString) {
+					case CONNECTOR_SINK_PARTITIONER_VALUE_FIXED:
+						return Optional.of(new FlinkFixedPartitioner<>());
+					case CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+						return Optional.empty();
+					case CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM:
+						final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+							descriptorProperties.getClass(CONNECTOR_SINK_PARTITIONER_CLASS, FlinkKafkaPartitioner.class);
+						return Optional.of(InstantiationUtil.instantiate(partitionerClass));
+					default:
+						throw new TableException("Unsupported sink partitioner. Validator should have checked that.");
+				}
+			});
 	}
 
 	private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
index 4535958..e44341a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors;
 
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -34,6 +35,11 @@ import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CO
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -51,6 +57,8 @@ public class Kafka extends ConnectorDescriptor {
 	private StartupMode startupMode;
 	private Map<Integer, Long> specificOffsets;
 	private Map<String, String> kafkaProperties;
+	private String sinkPartitionerType;
+	private Class<? extends FlinkKafkaPartitioner> sinkPartitionerClass;
 
 	/**
 	 * Connector descriptor for the Apache Kafka message queue.
@@ -176,6 +184,69 @@ public class Kafka extends ConnectorDescriptor {
 	}
 
 	/**
+	 * Configures how to partition records from Flink's partitions into Kafka's partitions.
+	 *
+	 * <p>This strategy ensures that each Flink partition ends up in one Kafka partition.
+	 *
+	 * <p>Note: One Kafka partition can contain multiple Flink partitions. Examples:
+	 *
+	 * <p>More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
+	 * the output of more than one flink partition:
+	 * <pre>
+	 *     Flink Sinks            Kafka Partitions
+	 *         1    ----------------&gt;    1
+	 *         2    --------------/
+	 *         3    -------------/
+	 *         4    ------------/
+	 * </pre>
+	 *
+	 *
+	 * <p>Fewer Flink partitions than Kafka partitions:
+	 * <pre>
+	 *     Flink Sinks            Kafka Partitions
+	 *         1    ----------------&gt;    1
+	 *         2    ----------------&gt;    2
+	 *                                      3
+	 *                                      4
+	 *                                      5
+	 * </pre>
+	 *
+	 * @see org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
+	 */
+	public Kafka sinkPartitionerFixed() {
+		sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+		sinkPartitionerClass = null;
+		return this;
+	}
+
+	/**
+	 * Configures how to partition records from Flink's partitions into Kafka's partitions.
+	 *
+	 * <p>This strategy ensures that records will be distributed to Kafka partitions in a
+	 * round-robin fashion.
+	 *
+	 * <p>Note: This strategy is useful to avoid an unbalanced partitioning. However, it will
+	 * cause a lot of network connections between all the Flink instances and all the Kafka brokers.
+	 */
+	public Kafka sinkPartitionerRoundRobin() {
+		sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
+		sinkPartitionerClass = null;
+		return this;
+	}
+
+	/**
+	 * Configures how to partition records from Flink's partitions into Kafka's partitions.
+	 *
+	 * <p>This strategy allows for a custom partitioner by providing an implementation
+	 * of {@link FlinkKafkaPartitioner}.
+	 */
+	public Kafka sinkPartitionerCustom(Class<? extends FlinkKafkaPartitioner> partitionerClass) {
+		sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+		sinkPartitionerClass = Preconditions.checkNotNull(partitionerClass);
+		return this;
+	}
+
+	/**
 	 * Internal method for connector properties conversion.
 	 */
 	@Override
@@ -212,5 +283,12 @@ public class Kafka extends ConnectorDescriptor {
 					.collect(Collectors.toList())
 				);
 		}
+
+		if (sinkPartitionerType != null) {
+			properties.putString(CONNECTOR_SINK_PARTITIONER, sinkPartitionerType);
+			if (sinkPartitionerClass != null) {
+				properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass);
+			}
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index 3adc7c5..cad37f8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -48,12 +48,27 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
 	public static final String CONNECTOR_PROPERTIES = "connector.properties";
 	public static final String CONNECTOR_PROPERTIES_KEY = "key";
 	public static final String CONNECTOR_PROPERTIES_VALUE = "value";
+	public static final String CONNECTOR_SINK_PARTITIONER = "connector.sink-partitioner";
+	public static final String CONNECTOR_SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+	public static final String CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM = "custom";
+	public static final String CONNECTOR_SINK_PARTITIONER_CLASS = "connector.sink-partitioner-class";
 
 	@Override
 	public void validate(DescriptorProperties properties) {
 		super.validate(properties);
 		properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA, false);
 
+		validateVersion(properties);
+
+		validateStartupMode(properties);
+
+		validateKafkaProperties(properties);
+
+		validateSinkPartitioner(properties);
+	}
+
+	private void validateVersion(DescriptorProperties properties) {
 		final List<String> versions = Arrays.asList(
 			CONNECTOR_VERSION_VALUE_08,
 			CONNECTOR_VERSION_VALUE_09,
@@ -61,7 +76,9 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
 			CONNECTOR_VERSION_VALUE_011);
 		properties.validateEnumValues(CONNECTOR_VERSION(), false, versions);
 		properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);
+	}
 
+	private void validateStartupMode(DescriptorProperties properties) {
 		final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>();
 		specificOffsetValidators.put(
 			CONNECTOR_SPECIFIC_OFFSETS_PARTITION,
@@ -86,17 +103,29 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
 			CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
 			prefix -> properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, specificOffsetValidators));
 		properties.validateEnum(CONNECTOR_STARTUP_MODE, true, startupModeValidation);
+	}
 
+	private void validateKafkaProperties(DescriptorProperties properties) {
 		final Map<String, Consumer<String>> propertyValidators = new HashMap<>();
 		propertyValidators.put(
 			CONNECTOR_PROPERTIES_KEY,
-			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE));
+			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1));
 		propertyValidators.put(
 			CONNECTOR_PROPERTIES_VALUE,
-			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE));
+			prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0));
 		properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, true, propertyValidators);
 	}
 
+	private void validateSinkPartitioner(DescriptorProperties properties) {
+		final Map<String, Consumer<String>> sinkPartitionerValidators = new HashMap<>();
+		sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_FIXED, properties.noValidation());
+		sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN, properties.noValidation());
+		sinkPartitionerValidators.put(
+			CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM,
+			prefix -> properties.validateString(CONNECTOR_SINK_PARTITIONER_CLASS, false, 1));
+		properties.validateEnum(CONNECTOR_SINK_PARTITIONER, true, sinkPartitionerValidators);
+	}
+
 	// utilities
 
 	public static String normalizeStartupMode(StartupMode startupMode) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 946b6eb5..b4bb89d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.Row;
 
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -59,7 +60,7 @@ public abstract class KafkaTableSinkTestBase {
 
 	@SuppressWarnings("unchecked")
 	@Test
-	public void testKafkaTableSink() throws Exception {
+	public void testKafkaTableSink() {
 		DataStream dataStream = mock(DataStream.class);
 		when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
 
@@ -74,7 +75,7 @@ public abstract class KafkaTableSinkTestBase {
 			eq(TOPIC),
 			eq(PROPERTIES),
 			any(getSerializationSchemaClass()),
-			eq(PARTITIONER));
+			eq(Optional.of(PARTITIONER)));
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 504bed1..5e9144c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -153,6 +153,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 					.version(getKafkaVersion())
 					.topic(TOPIC)
 					.properties(KAFKA_PROPERTIES)
+					.sinkPartitionerRoundRobin() // test if accepted although not needed
 					.startFromSpecificOffsets(OFFSETS))
 			.withFormat(new TestTableFormat())
 			.withSchema(
@@ -194,7 +195,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 			schema,
 			TOPIC,
 			KAFKA_PROPERTIES,
-			new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet
+			Optional.of(new FlinkFixedPartitioner<>()),
 			new TestSerializationSchema(schema.toRowType()));
 
 		// construct table sink using descriptors and table sink factory
@@ -204,6 +205,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 					.version(getKafkaVersion())
 					.topic(TOPIC)
 					.properties(KAFKA_PROPERTIES)
+					.sinkPartitionerFixed()
 					.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
 			.withFormat(new TestTableFormat())
 			.withSchema(
@@ -299,6 +301,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 		TableSchema schema,
 		String topic,
 		Properties properties,
-		FlinkKafkaPartitioner<Row> partitioner,
+		Optional<FlinkKafkaPartitioner<Row>> partitioner,
 		SerializationSchema<Row> serializationSchema);
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
index f3d96f1..c67bc4d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.descriptors;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -59,7 +61,8 @@ public class KafkaTest extends DescriptorTestBase {
 				.version("0.11")
 				.topic("MyTable")
 				.startFromSpecificOffsets(offsets)
-				.properties(properties);
+				.properties(properties)
+				.sinkPartitionerCustom(FlinkFixedPartitioner.class);
 
 		return Arrays.asList(earliestDesc, specificOffsetsDesc, specificOffsetsMapDesc);
 	}
@@ -102,6 +105,8 @@ public class KafkaTest extends DescriptorTestBase {
 		props3.put("connector.properties.0.value", "12");
 		props3.put("connector.properties.1.key", "kafka.stuff");
 		props3.put("connector.properties.1.value", "42");
+		props3.put("connector.sink-partitioner", "custom");
+		props3.put("connector.sink-partitioner-class", FlinkFixedPartitioner.class.getName());
 
 		return Arrays.asList(props1, props2, props3);
 	}