You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/06 16:18:00 UTC
[flink] branch master updated: [FLINK-19133] Open custom
partitioners in KafkaSerializationSchemaWrapper (#13326)
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 25b54a5 [FLINK-19133] Open custom partitioners in KafkaSerializationSchemaWrapper (#13326)
25b54a5 is described below
commit 25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Sun Sep 6 18:16:01 2020 +0200
[FLINK-19133] Open custom partitioners in KafkaSerializationSchemaWrapper (#13326)
---
.../internals/KafkaSerializationSchemaWrapper.java | 17 +++++++++
.../connectors/kafka/FlinkKafkaProducer.java | 30 +++++++--------
.../connectors/kafka/FlinkKafkaProducerTest.java | 43 +++++++++++++++++++++-
3 files changed, 73 insertions(+), 17 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
index 3cd0fc7..2c289d2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.internals;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
* {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} to the
* {@link KafkaSerializationSchema}.
*/
+@Internal
public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
private final FlinkKafkaPartitioner<T> partitioner;
@@ -40,6 +42,8 @@ public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSch
private boolean writeTimestamp;
private int[] partitions;
+ private int parallelInstanceId;
+ private int numParallelInstances;
public KafkaSerializationSchemaWrapper(
String topic,
@@ -55,6 +59,9 @@ public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSch
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
serializationSchema.open(context);
+ if (partitioner != null) {
+ partitioner.open(parallelInstanceId, numParallelInstances);
+ }
}
@Override
@@ -89,6 +96,16 @@ public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSch
this.partitions = partitions;
}
+ @Override
+ public void setParallelInstanceId(int parallelInstanceId) {
+ this.parallelInstanceId = parallelInstanceId;
+ }
+
+ @Override
+ public void setNumParallelInstances(int numParallelInstances) {
+ this.numParallelInstances = numParallelInstances;
+ }
+
public void setWriteTimestamp(boolean writeTimestamp) {
this.writeTimestamp = writeTimestamp;
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 82a3fac..1c7d9a8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -792,8 +792,20 @@ public class FlinkKafkaProducer<IN>
};
}
+ RuntimeContext ctx = getRuntimeContext();
+
+ if (flinkKafkaPartitioner != null) {
+ flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+ }
+
+ if (kafkaSchema instanceof KafkaContextAware) {
+ KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
+ contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
+ contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
+ }
+
if (kafkaSchema != null) {
- kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
+ kafkaSchema.open(() -> ctx.getMetricGroup().addGroup("user"));
}
super.open(configuration);
@@ -1229,22 +1241,8 @@ public class FlinkKafkaProducer<IN>
private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer();
- RuntimeContext ctx = getRuntimeContext();
-
- if (flinkKafkaPartitioner != null) {
- flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
- }
-
- if (kafkaSchema instanceof KafkaContextAware) {
- KafkaContextAware<IN> contextAwareSchema =
- (KafkaContextAware<IN>) kafkaSchema;
-
- contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
- contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
- }
-
LOG.info("Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}",
- ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+ getRuntimeContext().getIndexOfThisSubtask() + 1, getRuntimeContext().getNumberOfParallelSubtasks(), defaultTopicId);
// register Kafka metrics to Flink accumulators
if (registerMetrics && !Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
index a166c0e..f901ea6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -29,6 +30,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
+import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -40,7 +42,6 @@ import static org.junit.Assert.assertThat;
public class FlinkKafkaProducerTest {
@Test
public void testOpenSerializationSchemaProducer() throws Exception {
-
OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
"localhost:9092",
@@ -86,6 +87,46 @@ public class FlinkKafkaProducerTest {
assertThat(schema.openCalled, equalTo(true));
}
+ @Test
+ public void testOpenKafkaCustomPartitioner() throws Exception {
+ CustomPartitioner<Integer> partitioner = new CustomPartitioner<>();
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "localhost:9092");
+ FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+ "test-topic",
+ new OpenTestingSerializationSchema(),
+ properties,
+ Optional.of(partitioner)
+ );
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ 1,
+ 1,
+ 0,
+ IntSerializer.INSTANCE,
+ new OperatorID(1, 1));
+
+ testHarness.open();
+
+ assertThat(partitioner.openCalled, equalTo(true));
+ }
+
+ private static class CustomPartitioner<T> extends FlinkKafkaPartitioner<T> {
+ private boolean openCalled;
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances) {
+ super.open(parallelInstanceId, parallelInstances);
+ openCalled = true;
+ }
+
+ @Override
+ public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return 0;
+ }
+ }
+
private static class OpenTestingKafkaSerializationSchema implements KafkaSerializationSchema<Integer> {
private boolean openCalled;