You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/06 16:18:00 UTC

[flink] branch master updated: [FLINK-19133] Open custom partitioners in KafkaSerializationSchemaWrapper (#13326)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 25b54a5  [FLINK-19133] Open custom partitioners in KafkaSerializationSchemaWrapper (#13326)
25b54a5 is described below

commit 25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Sun Sep 6 18:16:01 2020 +0200

    [FLINK-19133] Open custom partitioners in KafkaSerializationSchemaWrapper (#13326)
---
 .../internals/KafkaSerializationSchemaWrapper.java | 17 +++++++++
 .../connectors/kafka/FlinkKafkaProducer.java       | 30 +++++++--------
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 43 +++++++++++++++++++++-
 3 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
index 3cd0fc7..2c289d2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
  * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} to the
  * {@link KafkaSerializationSchema}.
  */
+@Internal
 public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
 
 	private final FlinkKafkaPartitioner<T> partitioner;
@@ -40,6 +42,8 @@ public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSch
 	private boolean writeTimestamp;
 
 	private int[] partitions;
+	private int parallelInstanceId;
+	private int numParallelInstances;
 
 	public KafkaSerializationSchemaWrapper(
 			String topic,
@@ -55,6 +59,9 @@ public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSch
 	@Override
 	public void open(SerializationSchema.InitializationContext context) throws Exception {
 		serializationSchema.open(context);
+		if (partitioner != null) {
+			partitioner.open(parallelInstanceId, numParallelInstances);
+		}
 	}
 
 	@Override
@@ -89,6 +96,16 @@ public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSch
 		this.partitions = partitions;
 	}
 
+	@Override
+	public void setParallelInstanceId(int parallelInstanceId) {
+		this.parallelInstanceId = parallelInstanceId;
+	}
+
+	@Override
+	public void setNumParallelInstances(int numParallelInstances) {
+		this.numParallelInstances = numParallelInstances;
+	}
+
 	public void setWriteTimestamp(boolean writeTimestamp) {
 		this.writeTimestamp = writeTimestamp;
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 82a3fac..1c7d9a8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -792,8 +792,20 @@ public class FlinkKafkaProducer<IN>
 			};
 		}
 
+		RuntimeContext ctx = getRuntimeContext();
+
+		if (flinkKafkaPartitioner != null) {
+			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+		}
+
+		if (kafkaSchema instanceof KafkaContextAware) {
+			KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
+			contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
+			contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
+		}
+
 		if (kafkaSchema != null) {
-			kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
+			kafkaSchema.open(() -> ctx.getMetricGroup().addGroup("user"));
 		}
 
 		super.open(configuration);
@@ -1229,22 +1241,8 @@ public class FlinkKafkaProducer<IN>
 	private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
 		FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer();
 
-		RuntimeContext ctx = getRuntimeContext();
-
-		if (flinkKafkaPartitioner != null) {
-			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
-		}
-
-		if (kafkaSchema instanceof KafkaContextAware) {
-			KafkaContextAware<IN> contextAwareSchema =
-					(KafkaContextAware<IN>) kafkaSchema;
-
-			contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
-			contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
-		}
-
 		LOG.info("Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}",
-			ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+			getRuntimeContext().getIndexOfThisSubtask() + 1, getRuntimeContext().getNumberOfParallelSubtasks(), defaultTopicId);
 
 		// register Kafka metrics to Flink accumulators
 		if (registerMetrics && !Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
index a166c0e..f901ea6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -29,6 +30,7 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -40,7 +42,6 @@ import static org.junit.Assert.assertThat;
 public class FlinkKafkaProducerTest {
 	@Test
 	public void testOpenSerializationSchemaProducer() throws Exception {
-
 		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
 		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
 			"localhost:9092",
@@ -86,6 +87,46 @@ public class FlinkKafkaProducerTest {
 		assertThat(schema.openCalled, equalTo(true));
 	}
 
+	@Test
+	public void testOpenKafkaCustomPartitioner() throws Exception {
+		CustomPartitioner<Integer> partitioner = new CustomPartitioner<>();
+		Properties properties = new Properties();
+		properties.put("bootstrap.servers", "localhost:9092");
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			"test-topic",
+			new OpenTestingSerializationSchema(),
+			properties,
+			Optional.of(partitioner)
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(partitioner.openCalled, equalTo(true));
+	}
+
+	private static class CustomPartitioner<T> extends FlinkKafkaPartitioner<T> {
+		private boolean openCalled;
+
+		@Override
+		public void open(int parallelInstanceId, int parallelInstances) {
+			super.open(parallelInstanceId, parallelInstances);
+			openCalled = true;
+		}
+
+		@Override
+		public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+			return 0;
+		}
+	}
+
 	private static class OpenTestingKafkaSerializationSchema implements KafkaSerializationSchema<Integer> {
 		private boolean openCalled;