You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/08 08:48:16 UTC
[1/2] flink git commit: [hotfix][kafka] Extract
TransactionalIdsGenerator class from FlinkKafkaProducer011
Repository: flink
Updated Branches:
refs/heads/master 6bce2b833 -> fdae3ae1f
[hotfix][kafka] Extract TransactionalIdsGenerator class from FlinkKafkaProducer011
This is pure refactor without any functional changes.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab00d35b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab00d35b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab00d35b
Branch: refs/heads/master
Commit: ab00d35b88ce9cf26c66b7cbb21486d1b18573a6
Parents: 6bce2b8
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Nov 6 14:03:16 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 8 09:45:07 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer011.java | 53 +++++--------
.../internal/TransactionalIdsGenerator.java | 81 ++++++++++++++++++++
2 files changed, 99 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ab00d35b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 873ef08..0310019 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator;
import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
@@ -59,7 +60,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,8 +81,6 @@ import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.LongStream;
-import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -183,6 +181,11 @@ public class FlinkKafkaProducer011<IN>
private transient ListState<NextTransactionalIdHint> nextTransactionalIdHintState;
/**
+ * Generator for Transactional IDs.
+ */
+ private transient TransactionalIdsGenerator transactionalIdsGenerator;
+
+ /**
* Hint for picking next transactional id.
*/
private transient NextTransactionalIdHint nextTransactionalIdHint;
@@ -785,6 +788,11 @@ public class FlinkKafkaProducer011<IN>
nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+ transactionalIdsGenerator = new TransactionalIdsGenerator(
+ getRuntimeContext().getTaskName(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ kafkaProducersPoolSize,
+ SAFE_SCALE_DOWN_FACTOR);
if (semantic != Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
@@ -803,15 +811,8 @@ public class FlinkKafkaProducer011<IN>
// (1) the first execution of this application
// (2) previous execution has failed before first checkpoint completed
//
- // in case of (2) we have to abort all previous transactions, but we don't know was the parallelism used
- // then, so we must guess using current configured pool size, current parallelism and
- // SAFE_SCALE_DOWN_FACTOR
- long abortTransactionalIdStart = getRuntimeContext().getIndexOfThisSubtask();
- long abortTransactionalIdEnd = abortTransactionalIdStart + 1;
-
- abortTransactionalIdStart *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR;
- abortTransactionalIdEnd *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR;
- abortTransactions(LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd));
+ // in case of (2) we have to abort all previous transactions
+ abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
} else {
nextTransactionalIdHint = transactionalIdHints.get(0);
}
@@ -834,16 +835,7 @@ public class FlinkKafkaProducer011<IN>
private Set<String> generateNewTransactionalIds() {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be present for EXACTLY_ONCE");
- // range of available transactional ids is:
- // [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
- // loop below picks in a deterministic way a subrange of those available transactional ids based on index of
- // this subtask
- int subtaskId = getRuntimeContext().getIndexOfThisSubtask();
- Set<String> transactionalIds = new HashSet<>();
- for (int i = 0; i < kafkaProducersPoolSize; i++) {
- long transactionalId = nextTransactionalIdHint.nextFreeTransactionalId + subtaskId * kafkaProducersPoolSize + i;
- transactionalIds.add(generateTransactionalId(transactionalId));
- }
+ Set<String> transactionalIds = transactionalIdsGenerator.generateIdsToUse(nextTransactionalIdHint.nextFreeTransactionalId);
LOG.info("Generated new transactionalIds {}", transactionalIds);
return transactionalIds;
}
@@ -862,7 +854,7 @@ public class FlinkKafkaProducer011<IN>
if (!getUserContext().isPresent()) {
return;
}
- abortTransactions(getUserContext().get().transactionalIds.stream());
+ abortTransactions(getUserContext().get().transactionalIds);
}
private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) {
@@ -874,22 +866,13 @@ public class FlinkKafkaProducer011<IN>
// ----------------------------------- Utilities --------------------------
- private void abortTransactions(LongStream transactionalIds) {
- abortTransactions(transactionalIds.mapToObj(this::generateTransactionalId));
- }
-
- private void abortTransactions(Stream<String> transactionalIds) {
- transactionalIds.forEach(transactionalId -> {
+ private void abortTransactions(Set<String> transactionalIds) {
+ for (String transactionalId : transactionalIds) {
try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
initTransactionalProducer(transactionalId, false)) {
kafkaProducer.initTransactions();
}
- });
- }
-
- private String generateTransactionalId(long transactionalId) {
- String transactionalIdFormat = getRuntimeContext().getTaskName() + "-%d";
- return String.format(transactionalIdFormat, transactionalId);
+ }
}
int getTransactionCoordinatorId() {
http://git-wip-us.apache.org/repos/asf/flink/blob/ab00d35b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
new file mode 100644
index 0000000..eabdad9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when communicating with Kafka.
+ */
+public class TransactionalIdsGenerator {
+ private final String prefix;
+ private final int subtaskIndex;
+ private final int poolSize;
+ private final int safeScaleDownFactor;
+
+ public TransactionalIdsGenerator(
+ String prefix,
+ int subtaskIndex,
+ int poolSize,
+ int safeScaleDownFactor) {
+ this.prefix = checkNotNull(prefix);
+ this.subtaskIndex = subtaskIndex;
+ this.poolSize = poolSize;
+ this.safeScaleDownFactor = safeScaleDownFactor;
+ }
+
+ /**
+ * Range of available transactional ids to use is:
+ * [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
+ * loop below picks in a deterministic way a subrange of those available transactional ids based on index of
+ * this subtask.
+ */
+ public Set<String> generateIdsToUse(long nextFreeTransactionalId) {
+ Set<String> transactionalIds = new HashSet<>();
+ for (int i = 0; i < poolSize; i++) {
+ long transactionalId = nextFreeTransactionalId + subtaskIndex * poolSize + i;
+ transactionalIds.add(generateTransactionalId(transactionalId));
+ }
+ return transactionalIds;
+ }
+
+ /**
+ * If we have to abort previous transactional id in case of restart after a failure BEFORE first checkpoint
+ * completed, we don't know what was the parallelism used in previous attempt. In that case we must guess the ids
+ * range to abort based on current configured pool size, current parallelism and safeScaleDownFactor.
+ */
+ public Set<String> generateIdsToAbort() {
+ long abortTransactionalIdStart = subtaskIndex;
+ long abortTransactionalIdEnd = abortTransactionalIdStart + 1;
+
+ abortTransactionalIdStart *= poolSize * safeScaleDownFactor;
+ abortTransactionalIdEnd *= poolSize * safeScaleDownFactor;
+ return LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd)
+ .mapToObj(this::generateTransactionalId)
+ .collect(Collectors.toSet());
+ }
+
+ private String generateTransactionalId(long transactionalId) {
+ return String.format(prefix + "-%d", transactionalId);
+ }
+}
[2/2] flink git commit: [FLINK-7978][kafka] Ensure that transactional
ids will never clash
Posted by al...@apache.org.
[FLINK-7978][kafka] Ensure that transactional ids will never clash
Previously transactional ids to use and to abort could clash between
subtasks. This could lead to a race condition between initialization
and writting the data, where one subtask is still initializing/aborting
some transactional id while different subtask is already trying to write
the data using the same transactional id.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdae3ae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdae3ae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdae3ae1
Branch: refs/heads/master
Commit: fdae3ae1f990ca90375264634dedd6ebd54502a1
Parents: ab00d35
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Nov 6 14:14:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 8 09:45:14 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer011.java | 1 +
.../internal/TransactionalIdsGenerator.java | 32 +++++---
.../internal/TransactionalIdsGeneratorTest.java | 79 ++++++++++++++++++++
3 files changed, 102 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0310019..08599d8 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -791,6 +791,7 @@ public class FlinkKafkaProducer011<IN>
transactionalIdsGenerator = new TransactionalIdsGenerator(
getRuntimeContext().getTaskName(),
getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index eabdad9..2c4e6c9 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -19,27 +19,42 @@ package org.apache.flink.streaming.connectors.kafka.internal;
import java.util.HashSet;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Class responsible for generating transactional ids to use when communicating with Kafka.
+ *
+ * <p>It guarantees that:
+ * <ul>
+ * <li>generated ids to use will never clash with ids to use from different subtasks
+ * <li>generated ids to abort will never clash with ids to abort from different subtasks
+ * <li>generated ids to use will never clash with ids to abort from different subtasks
+ * </ul>
+ * In other words, any particular generated id will always be assigned to one and only one subtask.
*/
public class TransactionalIdsGenerator {
private final String prefix;
private final int subtaskIndex;
+ private final int totalNumberOfSubtasks;
private final int poolSize;
private final int safeScaleDownFactor;
public TransactionalIdsGenerator(
String prefix,
int subtaskIndex,
+ int totalNumberOfSubtasks,
int poolSize,
int safeScaleDownFactor) {
+ checkArgument(subtaskIndex < totalNumberOfSubtasks);
+ checkArgument(poolSize > 0);
+ checkArgument(safeScaleDownFactor > 0);
+ checkArgument(subtaskIndex >= 0);
+
this.prefix = checkNotNull(prefix);
this.subtaskIndex = subtaskIndex;
+ this.totalNumberOfSubtasks = totalNumberOfSubtasks;
this.poolSize = poolSize;
this.safeScaleDownFactor = safeScaleDownFactor;
}
@@ -65,14 +80,11 @@ public class TransactionalIdsGenerator {
* range to abort based on current configured pool size, current parallelism and safeScaleDownFactor.
*/
public Set<String> generateIdsToAbort() {
- long abortTransactionalIdStart = subtaskIndex;
- long abortTransactionalIdEnd = abortTransactionalIdStart + 1;
-
- abortTransactionalIdStart *= poolSize * safeScaleDownFactor;
- abortTransactionalIdEnd *= poolSize * safeScaleDownFactor;
- return LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd)
- .mapToObj(this::generateTransactionalId)
- .collect(Collectors.toSet());
+ Set<String> idsToAbort = new HashSet<>();
+ for (int i = 0; i < safeScaleDownFactor; i++) {
+ idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
+ }
+ return idsToAbort;
}
private String generateTransactionalId(long transactionalId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdae3ae1/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
new file mode 100644
index 0000000..7ad3779
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+ private static final int POOL_SIZE = 3;
+ private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+ private static final int SUBTASKS_COUNT = 5;
+
+ @Test
+ public void testGenerateIdsToUse() {
+ TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+
+ assertEquals(
+ new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")),
+ generator.generateIdsToUse(36));
+ }
+
+ /**
+ * Ids to abort and to use should never clash between subtasks.
+ */
+ @Test
+ public void testGeneratedIdsDoNotClash() {
+ List<Set<String>> idsToAbort = new ArrayList<>();
+ List<Set<String>> idsToUse = new ArrayList<>();
+
+ for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+ TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+ idsToUse.add(generator.generateIdsToUse(0));
+ idsToAbort.add(generator.generateIdsToAbort());
+ }
+
+ for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+ for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) {
+ if (subtask2 == subtask1) {
+ continue;
+ }
+ assertDisjoint(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+ assertDisjoint(idsToUse.get(subtask2), idsToUse.get(subtask1));
+ assertDisjoint(idsToAbort.get(subtask2), idsToUse.get(subtask1));
+ }
+ }
+ }
+
+ private <T> void assertDisjoint(Set<T> first, Set<T> second) {
+ HashSet<T> actual = new HashSet<>(first);
+ actual.retainAll(second);
+ assertEquals(Collections.emptySet(), actual);
+ }
+}