You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/08/06 12:41:54 UTC

[flink] 02/02: [FLINK-13498][kafka] abort transactions in parallel

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d774feaafb6bbfdbcee653230515fe93d5295926
Author: Nico Kruber <ni...@ververica.com>
AuthorDate: Tue Jul 30 16:46:54 2019 +0200

    [FLINK-13498][kafka] abort transactions in parallel
    
    This makes FlinkKafkaProducer abort transactions, e.g. during a first startup,
    in parallel making use of lingering CPU resources (using at most
    kafkaProducersPoolSize producers at once each, just like during runtime).
    
    Especially during that first startup (and thus also in tests), a lot of
    producers (5*poolSize) are being created at each sink instance to abort
    potentially existing previous transactions (in most cases, they don't exist).
---
 .../connectors/kafka/FlinkKafkaProducer011.java    | 23 ++++++++++++++------
 .../connectors/kafka/FlinkKafkaProducer.java       | 25 ++++++++++++++++------
 2 files changed, 35 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 7bc847f..d5718b7 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -912,13 +912,20 @@ public class FlinkKafkaProducer011<IN>
 	// ----------------------------------- Utilities --------------------------
 
 	private void abortTransactions(Set<String> transactionalIds) {
-		for (String transactionalId : transactionalIds) {
+		transactionalIds.parallelStream().forEach(transactionalId -> {
+			// don't mess with the original configuration or any other properties of the
+			// original object
+			// -> create an internal kafka producer on our own and do not rely on
+			//    initTransactionalProducer().
+			final Properties myConfig = new Properties();
+			myConfig.putAll(producerConfig);
+			initTransactionalProducerConfig(myConfig, transactionalId);
 			try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
-					initTransactionalProducer(transactionalId, false)) {
-				// it suffice to call initTransactions - this will abort any lingering transactions
+					new FlinkKafkaProducer<>(myConfig)) {
+				// it suffices to call initTransactions - this will abort any lingering transactions
 				kafkaProducer.initTransactions();
 			}
-		}
+		});
 	}
 
 	int getTransactionCoordinatorId() {
@@ -952,12 +959,16 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
-		producerConfig.put("transactional.id", transactionalId);
+		initTransactionalProducerConfig(producerConfig, transactionalId);
 		return initProducer(registerMetrics);
 	}
 
+	private static void initTransactionalProducerConfig(Properties producerConfig, String transactionalId) {
+		producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+	}
+
 	private FlinkKafkaProducer<byte[], byte[]> initNonTransactionalProducer(boolean registerMetrics) {
-		producerConfig.remove("transactional.id");
+		producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		return initProducer(registerMetrics);
 	}
 
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 605ee3f..7bf1913 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -1090,14 +1090,21 @@ public class FlinkKafkaProducer<IN>
 
 	// ----------------------------------- Utilities --------------------------
 
-	private void abortTransactions(Set<String> transactionalIds) {
-		for (String transactionalId : transactionalIds) {
+	private void abortTransactions(final Set<String> transactionalIds) {
+		transactionalIds.parallelStream().forEach(transactionalId -> {
+			// don't mess with the original configuration or any other properties of the
+			// original object
+			// -> create an internal kafka producer on our own and do not rely on
+			//    initTransactionalProducer().
+			final Properties myConfig = new Properties();
+			myConfig.putAll(producerConfig);
+			initTransactionalProducerConfig(myConfig, transactionalId);
 			try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
-				initTransactionalProducer(transactionalId, false)) {
-				// it suffice to call initTransactions - this will abort any lingering transactions
+					new FlinkKafkaInternalProducer<>(myConfig)) {
+				// it suffices to call initTransactions - this will abort any lingering transactions
 				kafkaProducer.initTransactions();
 			}
-		}
+		});
 	}
 
 	int getTransactionCoordinatorId() {
@@ -1131,12 +1138,16 @@ public class FlinkKafkaProducer<IN>
 	}
 
 	private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
-		producerConfig.put("transactional.id", transactionalId);
+		initTransactionalProducerConfig(producerConfig, transactionalId);
 		return initProducer(registerMetrics);
 	}
 
+	private static void initTransactionalProducerConfig(Properties producerConfig, String transactionalId) {
+		producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+	}
+
 	private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer(boolean registerMetrics) {
-		producerConfig.remove("transactional.id");
+		producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		return initProducer(registerMetrics);
 	}