You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/27 20:34:16 UTC

[GitHub] [flink] zentol commented on a change in pull request #11247: [FLINK-16262][Connectors] Set the context classloader for parallel stream in FlinkKafkaProducer

zentol commented on a change in pull request #11247: [FLINK-16262][Connectors] Set the context classloader for parallel stream in FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/11247#discussion_r385356328
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 ##########
 @@ -1095,18 +1096,21 @@ private void resetAvailableTransactionalIdsPool(Collection<String> transactional
 	// ----------------------------------- Utilities --------------------------
 
 	private void abortTransactions(final Set<String> transactionalIds) {
+		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 		transactionalIds.parallelStream().forEach(transactionalId -> {
-			// don't mess with the original configuration or any other properties of the
-			// original object
-			// -> create an internal kafka producer on our own and do not rely on
-			//    initTransactionalProducer().
-			final Properties myConfig = new Properties();
-			myConfig.putAll(producerConfig);
-			initTransactionalProducerConfig(myConfig, transactionalId);
-			try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
-					new FlinkKafkaInternalProducer<>(myConfig)) {
-				// it suffices to call initTransactions - this will abort any lingering transactions
-				kafkaProducer.initTransactions();
+			try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
 
 Review comment:
   Please add a comment that `parallelStream` executes the consumer in a separate thread pool.
   At a glance this context can easily appear as a no-op if you miss that `parallelStream` is used and what the semantics of it are.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services