You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/08 08:48:17 UTC

[2/2] flink git commit: [FLINK-7978][kafka] Ensure that transactional ids will never clash

[FLINK-7978][kafka] Ensure that transactional ids will never clash

Previously transactional ids to use and to abort could clash between
subtasks. This could lead to a race condition between initialization
and writting the data, where one subtask is still initializing/aborting
some transactional id while different subtask is already trying to write
the data using the same transactional id.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdae3ae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdae3ae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdae3ae1

Branch: refs/heads/master
Commit: fdae3ae1f990ca90375264634dedd6ebd54502a1
Parents: ab00d35
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Nov 6 14:14:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 8 09:45:14 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java |  1 +
 .../internal/TransactionalIdsGenerator.java     | 32 +++++---
 .../internal/TransactionalIdsGeneratorTest.java | 79 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0310019..08599d8 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -791,6 +791,7 @@ public class FlinkKafkaProducer011<IN>
 		transactionalIdsGenerator = new TransactionalIdsGenerator(
 			getRuntimeContext().getTaskName(),
 			getRuntimeContext().getIndexOfThisSubtask(),
+			getRuntimeContext().getNumberOfParallelSubtasks(),
 			kafkaProducersPoolSize,
 			SAFE_SCALE_DOWN_FACTOR);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index eabdad9..2c4e6c9 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -19,27 +19,42 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Class responsible for generating transactional ids to use when communicating with Kafka.
+ *
+ * <p>It guarantees that:
+ * <ul>
+ * 	<li>generated ids to use will never clash with ids to use from different subtasks
+ * 	<li>generated ids to abort will never clash with ids to abort from different subtasks
+ * 	<li>generated ids to use will never clash with ids to abort from different subtasks
+ * </ul>
+ * In other words, any particular generated id will always be assigned to one and only one subtask.
  */
 public class TransactionalIdsGenerator {
 	private final String prefix;
 	private final int subtaskIndex;
+	private final int totalNumberOfSubtasks;
 	private final int poolSize;
 	private final int safeScaleDownFactor;
 
 	public TransactionalIdsGenerator(
 			String prefix,
 			int subtaskIndex,
+			int totalNumberOfSubtasks,
 			int poolSize,
 			int safeScaleDownFactor) {
+		checkArgument(subtaskIndex < totalNumberOfSubtasks);
+		checkArgument(poolSize > 0);
+		checkArgument(safeScaleDownFactor > 0);
+		checkArgument(subtaskIndex >= 0);
+
 		this.prefix = checkNotNull(prefix);
 		this.subtaskIndex = subtaskIndex;
+		this.totalNumberOfSubtasks = totalNumberOfSubtasks;
 		this.poolSize = poolSize;
 		this.safeScaleDownFactor = safeScaleDownFactor;
 	}
@@ -65,14 +80,11 @@ public class TransactionalIdsGenerator {
 	 *  range to abort based on current configured pool size, current parallelism and safeScaleDownFactor.
 	 */
 	public Set<String> generateIdsToAbort() {
-		long abortTransactionalIdStart = subtaskIndex;
-		long abortTransactionalIdEnd = abortTransactionalIdStart + 1;
-
-		abortTransactionalIdStart *= poolSize * safeScaleDownFactor;
-		abortTransactionalIdEnd *= poolSize * safeScaleDownFactor;
-		return LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd)
-			.mapToObj(this::generateTransactionalId)
-			.collect(Collectors.toSet());
+		Set<String> idsToAbort = new HashSet<>();
+		for (int i = 0; i < safeScaleDownFactor; i++) {
+			idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
+		}
+		return idsToAbort;
 	}
 
 	private String generateTransactionalId(long transactionalId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
new file mode 100644
index 0000000..7ad3779
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+	private static final int POOL_SIZE = 3;
+	private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+	private static final int SUBTASKS_COUNT = 5;
+
+	@Test
+	public void testGenerateIdsToUse() {
+		TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+
+		assertEquals(
+			new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")),
+			generator.generateIdsToUse(36));
+	}
+
+	/**
+	 * Ids to abort and to use should never clash between subtasks.
+	 */
+	@Test
+	public void testGeneratedIdsDoNotClash() {
+		List<Set<String>> idsToAbort = new ArrayList<>();
+		List<Set<String>> idsToUse = new ArrayList<>();
+
+		for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+			TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+			idsToUse.add(generator.generateIdsToUse(0));
+			idsToAbort.add(generator.generateIdsToAbort());
+		}
+
+		for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+			for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) {
+				if (subtask2 == subtask1) {
+					continue;
+				}
+				assertDisjoint(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+				assertDisjoint(idsToUse.get(subtask2), idsToUse.get(subtask1));
+				assertDisjoint(idsToAbort.get(subtask2), idsToUse.get(subtask1));
+			}
+		}
+	}
+
+	private <T> void assertDisjoint(Set<T> first, Set<T> second) {
+		HashSet<T> actual = new HashSet<>(first);
+		actual.retainAll(second);
+		assertEquals(Collections.emptySet(), actual);
+	}
+}