You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/08 08:48:17 UTC
[2/2] flink git commit: [FLINK-7978][kafka] Ensure that transactional
ids will never clash
[FLINK-7978][kafka] Ensure that transactional ids will never clash
Previously transactional ids to use and to abort could clash between
subtasks. This could lead to a race condition between initialization
and writting the data, where one subtask is still initializing/aborting
some transactional id while different subtask is already trying to write
the data using the same transactional id.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdae3ae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdae3ae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdae3ae1
Branch: refs/heads/master
Commit: fdae3ae1f990ca90375264634dedd6ebd54502a1
Parents: ab00d35
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Nov 6 14:14:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 8 09:45:14 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer011.java | 1 +
.../internal/TransactionalIdsGenerator.java | 32 +++++---
.../internal/TransactionalIdsGeneratorTest.java | 79 ++++++++++++++++++++
3 files changed, 102 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0310019..08599d8 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -791,6 +791,7 @@ public class FlinkKafkaProducer011<IN>
transactionalIdsGenerator = new TransactionalIdsGenerator(
getRuntimeContext().getTaskName(),
getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index eabdad9..2c4e6c9 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -19,27 +19,42 @@ package org.apache.flink.streaming.connectors.kafka.internal;
import java.util.HashSet;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Class responsible for generating transactional ids to use when communicating with Kafka.
+ *
+ * <p>It guarantees that:
+ * <ul>
+ * <li>generated ids to use will never clash with ids to use from different subtasks
+ * <li>generated ids to abort will never clash with ids to abort from different subtasks
+ * <li>generated ids to use will never clash with ids to abort from different subtasks
+ * </ul>
+ * In other words, any particular generated id will always be assigned to one and only one subtask.
*/
public class TransactionalIdsGenerator {
private final String prefix;
private final int subtaskIndex;
+ private final int totalNumberOfSubtasks;
private final int poolSize;
private final int safeScaleDownFactor;
public TransactionalIdsGenerator(
String prefix,
int subtaskIndex,
+ int totalNumberOfSubtasks,
int poolSize,
int safeScaleDownFactor) {
+ checkArgument(subtaskIndex < totalNumberOfSubtasks);
+ checkArgument(poolSize > 0);
+ checkArgument(safeScaleDownFactor > 0);
+ checkArgument(subtaskIndex >= 0);
+
this.prefix = checkNotNull(prefix);
this.subtaskIndex = subtaskIndex;
+ this.totalNumberOfSubtasks = totalNumberOfSubtasks;
this.poolSize = poolSize;
this.safeScaleDownFactor = safeScaleDownFactor;
}
@@ -65,14 +80,11 @@ public class TransactionalIdsGenerator {
* range to abort based on current configured pool size, current parallelism and safeScaleDownFactor.
*/
public Set<String> generateIdsToAbort() {
- long abortTransactionalIdStart = subtaskIndex;
- long abortTransactionalIdEnd = abortTransactionalIdStart + 1;
-
- abortTransactionalIdStart *= poolSize * safeScaleDownFactor;
- abortTransactionalIdEnd *= poolSize * safeScaleDownFactor;
- return LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd)
- .mapToObj(this::generateTransactionalId)
- .collect(Collectors.toSet());
+ Set<String> idsToAbort = new HashSet<>();
+ for (int i = 0; i < safeScaleDownFactor; i++) {
+ idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
+ }
+ return idsToAbort;
}
private String generateTransactionalId(long transactionalId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
new file mode 100644
index 0000000..7ad3779
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+ private static final int POOL_SIZE = 3;
+ private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+ private static final int SUBTASKS_COUNT = 5;
+
+ @Test
+ public void testGenerateIdsToUse() {
+ TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+
+ assertEquals(
+ new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")),
+ generator.generateIdsToUse(36));
+ }
+
+ /**
+ * Ids to abort and to use should never clash between subtasks.
+ */
+ @Test
+ public void testGeneratedIdsDoNotClash() {
+ List<Set<String>> idsToAbort = new ArrayList<>();
+ List<Set<String>> idsToUse = new ArrayList<>();
+
+ for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+ TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+ idsToUse.add(generator.generateIdsToUse(0));
+ idsToAbort.add(generator.generateIdsToAbort());
+ }
+
+ for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+ for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) {
+ if (subtask2 == subtask1) {
+ continue;
+ }
+ assertDisjoint(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+ assertDisjoint(idsToUse.get(subtask2), idsToUse.get(subtask1));
+ assertDisjoint(idsToAbort.get(subtask2), idsToUse.get(subtask1));
+ }
+ }
+ }
+
+ private <T> void assertDisjoint(Set<T> first, Set<T> second) {
+ HashSet<T> actual = new HashSet<>(first);
+ actual.retainAll(second);
+ assertEquals(Collections.emptySet(), actual);
+ }
+}