You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/01 14:25:01 UTC
[flink] branch master updated: [FLINK-9979] [table] Support a
FlinkKafkaPartitioner for Kafka table sink factory
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 628b71d [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory
628b71d is described below
commit 628b71dfabe72a99eb6d54994fc870abed1f0268
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jul 27 14:03:50 2018 +0200
[FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory
Adds the possibility to add a FlinkKafkaPartitioner to a Kafka table sink
factory. It povides shortcuts for the built-in "fixed" and "round-robin" partitioning.
This closes #6440.
---
.../connectors/kafka/Kafka010JsonTableSink.java | 14 +++-
.../connectors/kafka/Kafka010TableSink.java | 7 +-
.../kafka/Kafka010TableSourceSinkFactory.java | 2 +-
.../kafka/Kafka010JsonTableSinkTest.java | 5 +-
.../kafka/Kafka010TableSourceSinkFactoryTest.java | 2 +-
.../connectors/kafka/Kafka011TableSink.java | 6 +-
.../kafka/Kafka011TableSourceSinkFactory.java | 2 +-
.../kafka/Kafka011TableSourceSinkFactoryTest.java | 2 +-
.../connectors/kafka/Kafka08JsonTableSink.java | 14 +++-
.../connectors/kafka/Kafka08TableSink.java | 7 +-
.../kafka/Kafka08TableSourceSinkFactory.java | 2 +-
.../connectors/kafka/Kafka08JsonTableSinkTest.java | 5 +-
.../kafka/Kafka08TableSourceSinkFactoryTest.java | 2 +-
.../connectors/kafka/Kafka09JsonTableSink.java | 14 +++-
.../connectors/kafka/Kafka09TableSink.java | 7 +-
.../kafka/Kafka09TableSourceSinkFactory.java | 2 +-
.../connectors/kafka/Kafka09JsonTableSinkTest.java | 5 +-
.../kafka/Kafka09TableSourceSinkFactoryTest.java | 2 +-
.../streaming/connectors/kafka/KafkaTableSink.java | 10 +--
.../kafka/KafkaTableSourceSinkFactoryBase.java | 33 +++++++--
.../org/apache/flink/table/descriptors/Kafka.java | 78 ++++++++++++++++++++++
.../flink/table/descriptors/KafkaValidator.java | 33 ++++++++-
.../connectors/kafka/KafkaTableSinkTestBase.java | 5 +-
.../kafka/KafkaTableSourceSinkFactoryTestBase.java | 6 +-
.../apache/flink/table/descriptors/KafkaTest.java | 7 +-
25 files changed, 223 insertions(+), 49 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index 2ad3142..8471908 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -73,16 +74,23 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink {
}
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(new FlinkFixedPartitioner<>()));
}
@Override
protected Kafka010JsonTableSink createCopy() {
- return new Kafka010JsonTableSink(topic, properties, partitioner);
+ return new Kafka010JsonTableSink(
+ topic,
+ properties,
+ partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
index a8c6553..1d408b8 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -36,7 +37,7 @@ public class Kafka010TableSink extends KafkaTableSink {
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
@@ -51,11 +52,11 @@ public class Kafka010TableSink extends KafkaTableSink {
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner) {
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(null));
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
index 0cf9499..ecf12b2 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka010TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index 339420c..9208f65 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -40,7 +40,10 @@ public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
- return new Kafka010JsonTableSink(topic, properties, partitioner);
+ return new Kafka010JsonTableSink(
+ topic,
+ properties,
+ partitioner);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
index cc198c9..dac8a4d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka010TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index 22c6da1..8d81a5b 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -39,7 +39,7 @@ public class Kafka011TableSink extends KafkaTableSink {
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
@@ -54,11 +54,11 @@ public class Kafka011TableSink extends KafkaTableSink {
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner) {
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
- Optional.of(partitioner));
+ partitioner);
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
index c26df42..e6f677f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka011TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
index 996c508..f461476 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka011TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 45588cd..189a9fd 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -92,17 +93,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
}
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer08<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(new FlinkFixedPartitioner<>()));
}
@Override
protected Kafka08JsonTableSink createCopy() {
- return new Kafka08JsonTableSink(topic, properties, partitioner);
+ return new Kafka08JsonTableSink(
+ topic,
+ properties,
+ partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
index c34de13..146cfc9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -36,7 +37,7 @@ public class Kafka08TableSink extends KafkaTableSink {
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
@@ -51,11 +52,11 @@ public class Kafka08TableSink extends KafkaTableSink {
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner) {
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer08<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(null));
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
index 3e93b6f..aeccd4f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka08TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 32bd3b6..fc46ad4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -40,7 +40,10 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
- return new Kafka08JsonTableSink(topic, properties, partitioner);
+ return new Kafka08JsonTableSink(
+ topic,
+ properties,
+ partitioner);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
index b67501e..ff633ec 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka08TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b3cc0aa..3363459 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -92,16 +93,23 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
}
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer09<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(new FlinkFixedPartitioner<>()));
}
@Override
protected Kafka09JsonTableSink createCopy() {
- return new Kafka09JsonTableSink(topic, properties, partitioner);
+ return new Kafka09JsonTableSink(
+ topic,
+ properties,
+ partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
index 8c349d7..6e38aad 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -36,7 +37,7 @@ public class Kafka09TableSink extends KafkaTableSink {
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
@@ -51,11 +52,11 @@ public class Kafka09TableSink extends KafkaTableSink {
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner) {
+ Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer09<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(null));
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
index 9958b4e..19f5150 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka09TableSink(
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 79f251b..97b5c7d 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -40,7 +40,10 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
- return new Kafka09JsonTableSink(topic, properties, partitioner);
+ return new Kafka09JsonTableSink(
+ topic,
+ properties,
+ partitioner);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
index a6c8bd4..d54c394 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
return new Kafka09TableSink(
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 7853bb7..a85d536 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -40,7 +40,7 @@ import java.util.Properties;
* A version-agnostic Kafka {@link AppendStreamTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
*/
@Internal
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@@ -60,7 +60,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
protected Optional<SerializationSchema<Row>> serializationSchema;
/** Partitioner to select Kafka partition for each item. */
- protected final FlinkKafkaPartitioner<Row> partitioner;
+ protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
// legacy variables
protected String[] fieldNames;
@@ -70,7 +70,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
@@ -96,7 +96,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
this.schema = Optional.empty();
this.topic = Preconditions.checkNotNull(topic, "topic");
this.properties = Preconditions.checkNotNull(properties, "properties");
- this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+ this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
this.serializationSchema = Optional.empty();
}
@@ -113,7 +113,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
- FlinkKafkaPartitioner<Row> partitioner);
+ Optional<FlinkKafkaPartitioner<Row>> partitioner);
/**
* Create serialization schema for converting table rows into bytes.
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 27b2e67..5634331 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
import java.util.ArrayList;
import java.util.Arrays;
@@ -54,6 +55,11 @@ import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMA
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -105,6 +111,8 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
properties.add(CONNECTOR_STARTUP_MODE);
properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+ properties.add(CONNECTOR_SINK_PARTITIONER);
+ properties.add(CONNECTOR_SINK_PARTITIONER_CLASS);
// schema
properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
@@ -170,7 +178,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
schema,
topic,
getKafkaProperties(descriptorProperties),
- getFlinkKafkaPartitioner(),
+ getFlinkKafkaPartitioner(descriptorProperties),
getSerializationSchema(properties));
}
@@ -228,7 +236,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema);
// --------------------------------------------------------------------------------------------
@@ -314,9 +322,24 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
return options;
}
- private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
- // we don't support custom partitioner so far
- return new FlinkFixedPartitioner<>();
+ @SuppressWarnings("unchecked")
+ private Optional<FlinkKafkaPartitioner<Row>> getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) {
+ return descriptorProperties
+ .getOptionalString(CONNECTOR_SINK_PARTITIONER)
+ .flatMap((String partitionerString) -> {
+ switch (partitionerString) {
+ case CONNECTOR_SINK_PARTITIONER_VALUE_FIXED:
+ return Optional.of(new FlinkFixedPartitioner<>());
+ case CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+ return Optional.empty();
+ case CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM:
+ final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+ descriptorProperties.getClass(CONNECTOR_SINK_PARTITIONER_CLASS, FlinkKafkaPartitioner.class);
+ return Optional.of(InstantiationUtil.instantiate(partitionerClass));
+ default:
+ throw new TableException("Unsupported sink partitioner. Validator should have checked that.");
+ }
+ });
}
private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
index 4535958..e44341a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -34,6 +35,11 @@ import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CO
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -51,6 +57,8 @@ public class Kafka extends ConnectorDescriptor {
private StartupMode startupMode;
private Map<Integer, Long> specificOffsets;
private Map<String, String> kafkaProperties;
+ private String sinkPartitionerType;
+ private Class<? extends FlinkKafkaPartitioner> sinkPartitionerClass;
/**
* Connector descriptor for the Apache Kafka message queue.
@@ -176,6 +184,69 @@ public class Kafka extends ConnectorDescriptor {
}
/**
+ * Configures how to partition records from Flink's partitions into Kafka's partitions.
+ *
+ * <p>This strategy ensures that each Flink partition ends up in one Kafka partition.
+ *
+ * <p>Note: One Kafka partition can contain multiple Flink partitions. Examples:
+ *
+ * <p>More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain
+ * the output of more than one flink partition:
+ * <pre>
+ * Flink Sinks Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ *
+ *
+ * <p>Fewer Flink partitions than Kafka partitions:
+ * <pre>
+ * Flink Sinks Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ * </pre>
+ *
+ * @see org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
+ */
+ public Kafka sinkPartitionerFixed() {
+ sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+ sinkPartitionerClass = null;
+ return this;
+ }
+
+ /**
+ * Configures how to partition records from Flink's partitions into Kafka's partitions.
+ *
+ * <p>This strategy ensures that records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ *
+ * <p>Note: This strategy is useful to avoid an unbalanced partitioning. However, it will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers.
+ */
+ public Kafka sinkPartitionerRoundRobin() {
+ sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
+ sinkPartitionerClass = null;
+ return this;
+ }
+
+ /**
+ * Configures how to partition records from Flink's partitions into Kafka's partitions.
+ *
+ * <p>This strategy allows for a custom partitioner by providing an implementation
+ * of {@link FlinkKafkaPartitioner}.
+ */
+ public Kafka sinkPartitionerCustom(Class<? extends FlinkKafkaPartitioner> partitionerClass) {
+ sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+ sinkPartitionerClass = Preconditions.checkNotNull(partitionerClass);
+ return this;
+ }
+
+ /**
* Internal method for connector properties conversion.
*/
@Override
@@ -212,5 +283,12 @@ public class Kafka extends ConnectorDescriptor {
.collect(Collectors.toList())
);
}
+
+ if (sinkPartitionerType != null) {
+ properties.putString(CONNECTOR_SINK_PARTITIONER, sinkPartitionerType);
+ if (sinkPartitionerClass != null) {
+ properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass);
+ }
+ }
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index 3adc7c5..cad37f8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -48,12 +48,27 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_PROPERTIES = "connector.properties";
public static final String CONNECTOR_PROPERTIES_KEY = "key";
public static final String CONNECTOR_PROPERTIES_VALUE = "value";
+ public static final String CONNECTOR_SINK_PARTITIONER = "connector.sink-partitioner";
+ public static final String CONNECTOR_SINK_PARTITIONER_VALUE_FIXED = "fixed";
+ public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+ public static final String CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM = "custom";
+ public static final String CONNECTOR_SINK_PARTITIONER_CLASS = "connector.sink-partitioner-class";
@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA, false);
+ validateVersion(properties);
+
+ validateStartupMode(properties);
+
+ validateKafkaProperties(properties);
+
+ validateSinkPartitioner(properties);
+ }
+
+ private void validateVersion(DescriptorProperties properties) {
final List<String> versions = Arrays.asList(
CONNECTOR_VERSION_VALUE_08,
CONNECTOR_VERSION_VALUE_09,
@@ -61,7 +76,9 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
CONNECTOR_VERSION_VALUE_011);
properties.validateEnumValues(CONNECTOR_VERSION(), false, versions);
properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);
+ }
+ private void validateStartupMode(DescriptorProperties properties) {
final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>();
specificOffsetValidators.put(
CONNECTOR_SPECIFIC_OFFSETS_PARTITION,
@@ -86,17 +103,29 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
prefix -> properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, specificOffsetValidators));
properties.validateEnum(CONNECTOR_STARTUP_MODE, true, startupModeValidation);
+ }
+ private void validateKafkaProperties(DescriptorProperties properties) {
final Map<String, Consumer<String>> propertyValidators = new HashMap<>();
propertyValidators.put(
CONNECTOR_PROPERTIES_KEY,
- prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE));
+ prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1));
propertyValidators.put(
CONNECTOR_PROPERTIES_VALUE,
- prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE));
+ prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0));
properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, true, propertyValidators);
}
+ private void validateSinkPartitioner(DescriptorProperties properties) {
+ final Map<String, Consumer<String>> sinkPartitionerValidators = new HashMap<>();
+ sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_FIXED, properties.noValidation());
+ sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN, properties.noValidation());
+ sinkPartitionerValidators.put(
+ CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM,
+ prefix -> properties.validateString(CONNECTOR_SINK_PARTITIONER_CLASS, false, 1));
+ properties.validateEnum(CONNECTOR_SINK_PARTITIONER, true, sinkPartitionerValidators);
+ }
+
// utilities
public static String normalizeStartupMode(StartupMode startupMode) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 946b6eb5..b4bb89d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.Row;
import org.junit.Test;
+import java.util.Optional;
import java.util.Properties;
import static org.junit.Assert.assertArrayEquals;
@@ -59,7 +60,7 @@ public abstract class KafkaTableSinkTestBase {
@SuppressWarnings("unchecked")
@Test
- public void testKafkaTableSink() throws Exception {
+ public void testKafkaTableSink() {
DataStream dataStream = mock(DataStream.class);
when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
@@ -74,7 +75,7 @@ public abstract class KafkaTableSinkTestBase {
eq(TOPIC),
eq(PROPERTIES),
any(getSerializationSchemaClass()),
- eq(PARTITIONER));
+ eq(Optional.of(PARTITIONER)));
}
@Test
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 504bed1..5e9144c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -153,6 +153,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
.version(getKafkaVersion())
.topic(TOPIC)
.properties(KAFKA_PROPERTIES)
+ .sinkPartitionerRoundRobin() // test if accepted although not needed
.startFromSpecificOffsets(OFFSETS))
.withFormat(new TestTableFormat())
.withSchema(
@@ -194,7 +195,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
schema,
TOPIC,
KAFKA_PROPERTIES,
- new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet
+ Optional.of(new FlinkFixedPartitioner<>()),
new TestSerializationSchema(schema.toRowType()));
// construct table sink using descriptors and table sink factory
@@ -204,6 +205,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
.version(getKafkaVersion())
.topic(TOPIC)
.properties(KAFKA_PROPERTIES)
+ .sinkPartitionerFixed()
.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
.withFormat(new TestTableFormat())
.withSchema(
@@ -299,6 +301,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
TableSchema schema,
String topic,
Properties properties,
- FlinkKafkaPartitioner<Row> partitioner,
+ Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema);
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
index f3d96f1..c67bc4d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.descriptors;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -59,7 +61,8 @@ public class KafkaTest extends DescriptorTestBase {
.version("0.11")
.topic("MyTable")
.startFromSpecificOffsets(offsets)
- .properties(properties);
+ .properties(properties)
+ .sinkPartitionerCustom(FlinkFixedPartitioner.class);
return Arrays.asList(earliestDesc, specificOffsetsDesc, specificOffsetsMapDesc);
}
@@ -102,6 +105,8 @@ public class KafkaTest extends DescriptorTestBase {
props3.put("connector.properties.0.value", "12");
props3.put("connector.properties.1.key", "kafka.stuff");
props3.put("connector.properties.1.value", "42");
+ props3.put("connector.sink-partitioner", "custom");
+ props3.put("connector.sink-partitioner-class", FlinkFixedPartitioner.class.getName());
return Arrays.asList(props1, props2, props3);
}