You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/08/06 12:41:54 UTC
[flink] 02/02: [FLINK-13498][kafka] abort transactions in parallel
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d774feaafb6bbfdbcee653230515fe93d5295926
Author: Nico Kruber <ni...@ververica.com>
AuthorDate: Tue Jul 30 16:46:54 2019 +0200
[FLINK-13498][kafka] abort transactions in parallel
This makes FlinkKafkaProducer abort transactions, e.g. during a first startup,
in parallel making use of lingering CPU resources (using at most
kafkaProducersPoolSize producers at once each, just like during runtime).
Especially during that first startup (and thus also in tests), a lot of
producers (5*poolSize) are being created at each sink instance to abort
potentially existing previous transactions (in most cases, they don't exist).
---
.../connectors/kafka/FlinkKafkaProducer011.java | 23 ++++++++++++++------
.../connectors/kafka/FlinkKafkaProducer.java | 25 ++++++++++++++++------
2 files changed, 35 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 7bc847f..d5718b7 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -912,13 +912,20 @@ public class FlinkKafkaProducer011<IN>
// ----------------------------------- Utilities --------------------------
private void abortTransactions(Set<String> transactionalIds) {
- for (String transactionalId : transactionalIds) {
+ transactionalIds.parallelStream().forEach(transactionalId -> {
+ // don't mess with the original configuration or any other properties of the
+ // original object
+ // -> create an internal kafka producer on our own and do not rely on
+ // initTransactionalProducer().
+ final Properties myConfig = new Properties();
+ myConfig.putAll(producerConfig);
+ initTransactionalProducerConfig(myConfig, transactionalId);
try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
- initTransactionalProducer(transactionalId, false)) {
- // it suffice to call initTransactions - this will abort any lingering transactions
+ new FlinkKafkaProducer<>(myConfig)) {
+ // it suffices to call initTransactions - this will abort any lingering transactions
kafkaProducer.initTransactions();
}
- }
+ });
}
int getTransactionCoordinatorId() {
@@ -952,12 +959,16 @@ public class FlinkKafkaProducer011<IN>
}
private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
- producerConfig.put("transactional.id", transactionalId);
+ initTransactionalProducerConfig(producerConfig, transactionalId);
return initProducer(registerMetrics);
}
+ private static void initTransactionalProducerConfig(Properties producerConfig, String transactionalId) {
+ producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ }
+
private FlinkKafkaProducer<byte[], byte[]> initNonTransactionalProducer(boolean registerMetrics) {
- producerConfig.remove("transactional.id");
+ producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
return initProducer(registerMetrics);
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 605ee3f..7bf1913 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -1090,14 +1090,21 @@ public class FlinkKafkaProducer<IN>
// ----------------------------------- Utilities --------------------------
- private void abortTransactions(Set<String> transactionalIds) {
- for (String transactionalId : transactionalIds) {
+ private void abortTransactions(final Set<String> transactionalIds) {
+ transactionalIds.parallelStream().forEach(transactionalId -> {
+ // don't mess with the original configuration or any other properties of the
+ // original object
+ // -> create an internal kafka producer on our own and do not rely on
+ // initTransactionalProducer().
+ final Properties myConfig = new Properties();
+ myConfig.putAll(producerConfig);
+ initTransactionalProducerConfig(myConfig, transactionalId);
try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
- initTransactionalProducer(transactionalId, false)) {
- // it suffice to call initTransactions - this will abort any lingering transactions
+ new FlinkKafkaInternalProducer<>(myConfig)) {
+ // it suffices to call initTransactions - this will abort any lingering transactions
kafkaProducer.initTransactions();
}
- }
+ });
}
int getTransactionCoordinatorId() {
@@ -1131,12 +1138,16 @@ public class FlinkKafkaProducer<IN>
}
private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
- producerConfig.put("transactional.id", transactionalId);
+ initTransactionalProducerConfig(producerConfig, transactionalId);
return initProducer(registerMetrics);
}
+ private static void initTransactionalProducerConfig(Properties producerConfig, String transactionalId) {
+ producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ }
+
private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer(boolean registerMetrics) {
- producerConfig.remove("transactional.id");
+ producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
return initProducer(registerMetrics);
}