You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/28 13:23:29 UTC

[flink] branch master updated: [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0d0c9  [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer
ff0d0c9 is described below

commit ff0d0c979d7cf67648ecf91850e782e99d557240
Author: guowei.mgw <gu...@gmail.com>
AuthorDate: Sat Mar 28 21:22:57 2020 +0800

    [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer
    
    This closes #11247
---
 .../connectors/kafka/FlinkKafkaProducer.java       | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3a92c81..9b42b5f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -1095,18 +1096,24 @@ public class FlinkKafkaProducer<IN>
 	// ----------------------------------- Utilities --------------------------
 
 	private void abortTransactions(final Set<String> transactionalIds) {
+		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 		transactionalIds.parallelStream().forEach(transactionalId -> {
-			// don't mess with the original configuration or any other properties of the
-			// original object
-			// -> create an internal kafka producer on our own and do not rely on
-			//    initTransactionalProducer().
-			final Properties myConfig = new Properties();
-			myConfig.putAll(producerConfig);
-			initTransactionalProducerConfig(myConfig, transactionalId);
-			try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
-					new FlinkKafkaInternalProducer<>(myConfig)) {
-				// it suffices to call initTransactions - this will abort any lingering transactions
-				kafkaProducer.initTransactions();
+			// The parallelStream executes the consumer in a separated thread pool.
+			// Because the consumer(e.g. Kafka) uses the context classloader to construct some class
+			// we should set the correct classloader for it.
+			try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+				// don't mess with the original configuration or any other properties of the
+				// original object
+				// -> create an internal kafka producer on our own and do not rely on
+				//    initTransactionalProducer().
+				final Properties myConfig = new Properties();
+				myConfig.putAll(producerConfig);
+				initTransactionalProducerConfig(myConfig, transactionalId);
+				try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
+						new FlinkKafkaInternalProducer<>(myConfig)) {
+					// it suffices to call initTransactions - this will abort any lingering transactions
+					kafkaProducer.initTransactions();
+				}
 			}
 		});
 	}