You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/07/31 07:27:49 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #9287: [FLINK-13498][kafka] abort transactions in parallel

pnowojski commented on a change in pull request #9287: [FLINK-13498][kafka] abort transactions in parallel
URL: https://github.com/apache/flink/pull/9287#discussion_r309069745
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ##########
 @@ -906,11 +907,33 @@ private void resetAvailableTransactionalIdsPool(Collection<String> transactional
 	// ----------------------------------- Utilities --------------------------
 
 	private void abortTransactions(Set<String> transactionalIds) {
-		for (String transactionalId : transactionalIds) {
-			try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
-					initTransactionalProducer(transactionalId, false)) {
-				// it suffice to call initTransactions - this will abort any lingering transactions
-				kafkaProducer.initTransactions();
+		// shortcut for non-exactly-once producers
+		if (transactionalIds.isEmpty()) {
+			return;
+		}
+
+		ForkJoinPool forkJoinPool = null;
+		try {
+			// limit the number of connections to the number that is used during runtime
+			forkJoinPool = new ForkJoinPool(kafkaProducersPoolSize);
 
 Review comment:
   Hmm, when we were discussing offline I understood that this will be global pool with static size. As it is now, we can be creating some substantial number (hundreds) of threads (albeit short lived). I'm not sure about this...
   
   Crazy idea... what about making this pool `static` and with killing/shutting down idling threads?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services