You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:52 UTC

[16/17] flink git commit: [FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011

[FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011

Previously if there were two completely independent FlinkKafkaProducer011 data sinks
in the job graph, their transactional.id would collide with one another. Fix is to
use operator's unique ID as well along task name and subtask id.

This change is backward compatible for recovering from older savepoints,
since transactional.ids generated by the old generator still will be used
after restoring from state.

This closes #5977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b7493d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b7493d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b7493d1

Branch: refs/heads/master
Commit: 9b7493d143add477fbfd230ea312270b26e45d85
Parents: 3fd694d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue May 8 17:49:31 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:42:30 2018 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java |  2 +-
 .../kafka/FlinkKafkaProducer011ITCase.java      |  4 +-
 .../Kafka011ProducerExactlyOnceITCase.java      |  6 +++
 .../connectors/kafka/KafkaProducerTestBase.java | 55 +++++++++++---------
 .../util/AbstractStreamOperatorTestHarness.java | 31 +++++++++--
 .../util/OneInputStreamOperatorTestHarness.java | 17 ++++--
 6 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b7493d1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0ae5e03b..8497372 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -837,7 +837,7 @@ public class FlinkKafkaProducer011<IN>
 		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
 			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
 		transactionalIdsGenerator = new TransactionalIdsGenerator(
-			getRuntimeContext().getTaskName(),
+			getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
 			getRuntimeContext().getIndexOfThisSubtask(),
 			getRuntimeContext().getNumberOfParallelSubtasks(),
 			kafkaProducersPoolSize,

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7493d1/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 36cb362..74c58ad 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -627,7 +628,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 			maxParallelism,
 			parallelism,
 			subtaskIndex,
-			IntSerializer.INSTANCE);
+			IntSerializer.INSTANCE,
+			new OperatorID(42, 44));
 	}
 
 	private Properties createProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7493d1/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
index 1167238..5038b7f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -48,4 +49,9 @@ public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
 		// that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design
 		// and this test should be reimplemented in completely different way...
 	}
+
+	@Test
+	public void testMultipleSinkOperators() throws Exception {
+		testExactlyOnce(false, 2);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7493d1/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 629497e..69eb94a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -303,7 +303,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 */
 	@Test
 	public void testExactlyOnceRegularSink() throws Exception {
-		testExactlyOnce(true);
+		testExactlyOnce(true, 1);
 	}
 
 	/**
@@ -311,20 +311,22 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 */
 	@Test
 	public void testExactlyOnceCustomOperator() throws Exception {
-		testExactlyOnce(false);
+		testExactlyOnce(false, 1);
 	}
 
 	/**
 	 * This test sets KafkaProducer so that it will  automatically flush the data and
 	 * and fails the broker to check whether flushed records since last checkpoint were not duplicated.
 	 */
-	protected void testExactlyOnce(boolean regularSink) throws Exception {
-		final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
+	protected void testExactlyOnce(boolean regularSink, int sinksCount) throws Exception {
+		final String topic = (regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator") + sinksCount;
 		final int partition = 0;
 		final int numElements = 1000;
 		final int failAfterElements = 333;
 
-		createTestTopic(topic, 1, 1);
+		for (int i = 0; i < sinksCount; i++) {
+			createTestTopic(topic + i, 1, 1);
+		}
 
 		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
@@ -346,32 +348,35 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			.addSource(new IntegerSource(numElements))
 			.map(new FailingIdentityMapper<Integer>(failAfterElements));
 
-		FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
-			@Override
-			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
-				return partition;
+		for (int i = 0; i < sinksCount; i++) {
+			FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
+				@Override
+				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+					return partition;
+				}
+			};
+
+			if (regularSink) {
+				StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner);
+				inputStream.addSink(kafkaSink.getUserFunction());
+			} else {
+				kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner);
 			}
-		};
-		if (regularSink) {
-			StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner);
-			inputStream.addSink(kafkaSink.getUserFunction());
-		}
-		else {
-			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner);
 		}
 
 		FailingIdentityMapper.failedBefore = false;
 		TestUtils.tryExecute(env, "Exactly once test");
 
-		// assert that before failure we successfully snapshot/flushed all expected elements
-		assertExactlyOnceForTopic(
-			properties,
-			topic,
-			partition,
-			expectedElements,
-			KAFKA_READ_TIMEOUT);
-
-		deleteTestTopic(topic);
+		for (int i = 0; i < sinksCount; i++) {
+			// assert that before failure we successfully snapshot/flushed all expected elements
+			assertExactlyOnceForTopic(
+				properties,
+				topic + i,
+				partition,
+				expectedElements,
+				KAFKA_READ_TIMEOUT);
+			deleteTestTopic(topic + i);
+		}
 	}
 
 	private List<Integer> getIntegersSequence(int size) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7493d1/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 26ad3ab..0c4ecc0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -147,19 +147,42 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 				.setParallelism(parallelism)
 				.setSubtaskIndex(subtaskIndex)
 				.build(),
-			true);
+			true,
+			new OperatorID());
+	}
+
+	public AbstractStreamOperatorTestHarness(
+			StreamOperator<OUT> operator,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			OperatorID operatorID) throws Exception {
+		this(
+			operator,
+			new MockEnvironmentBuilder()
+				.setTaskName("MockTask")
+				.setMemorySize(3 * 1024 * 1024)
+				.setInputSplitProvider(new MockInputSplitProvider())
+				.setBufferSize(1024)
+				.setMaxParallelism(maxParallelism)
+				.setParallelism(parallelism)
+				.setSubtaskIndex(subtaskIndex)
+				.build(),
+			true,
+			operatorID);
 	}
 
 	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
 			MockEnvironment env) throws Exception {
-		this(operator, env, false);
+		this(operator, env, false, new OperatorID());
 	}
 
 	private AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
 			MockEnvironment env,
-			boolean environmentIsInternal) throws Exception {
+			boolean environmentIsInternal,
+			OperatorID operatorID) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
 		this.sideOutputLists = new HashMap<>();
@@ -167,7 +190,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		Configuration underlyingConfig = env.getTaskConfiguration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
-		this.config.setOperatorID(new OperatorID());
+		this.config.setOperatorID(operatorID);
 		this.executionConfig = env.getExecutionConfig();
 		this.closableRegistry = new CloseableRegistry();
 		this.checkpointLock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7493d1/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 66d2f69..0155198 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -54,8 +55,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		int maxParallelism,
 		int parallelism,
 		int subtaskIndex,
-		TypeSerializer<IN> typeSerializerIn) throws Exception {
-		this(operator, maxParallelism, parallelism, subtaskIndex);
+		TypeSerializer<IN> typeSerializerIn,
+		OperatorID operatorID) throws Exception {
+		this(operator, maxParallelism, parallelism, subtaskIndex, operatorID);
 
 		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
 	}
@@ -78,7 +80,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 			int maxParallelism,
 			int parallelism,
 			int subtaskIndex) throws Exception {
-		super(operator, maxParallelism, parallelism, subtaskIndex);
+		this(operator, maxParallelism, parallelism, subtaskIndex, new OperatorID());
+	}
+
+	public OneInputStreamOperatorTestHarness(
+			OneInputStreamOperator<IN, OUT> operator,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			OperatorID operatorID) throws Exception {
+		super(operator, maxParallelism, parallelism, subtaskIndex, operatorID);
 
 		this.oneInputOperator = operator;
 	}