You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/28 13:26:48 UTC

[flink] branch release-1.10 updated: [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer (#11497)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new e39cfe7  [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer (#11497)
e39cfe7 is described below

commit e39cfe7660daaeed4213f04ccbce6de1e8d90fe5
Author: guowei.mgw <gu...@gmail.com>
AuthorDate: Sat Mar 28 21:26:31 2020 +0800

    [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer (#11497)
    
    This closes #11497
---
 .../connectors/kafka/FlinkKafkaProducer.java       | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 821a8f1..40d8644 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -1096,18 +1097,24 @@ public class FlinkKafkaProducer<IN>
 	// ----------------------------------- Utilities --------------------------
 
 	private void abortTransactions(final Set<String> transactionalIds) {
+		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 		transactionalIds.parallelStream().forEach(transactionalId -> {
-			// don't mess with the original configuration or any other properties of the
-			// original object
-			// -> create an internal kafka producer on our own and do not rely on
-			//    initTransactionalProducer().
-			final Properties myConfig = new Properties();
-			myConfig.putAll(producerConfig);
-			initTransactionalProducerConfig(myConfig, transactionalId);
-			try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
-					new FlinkKafkaInternalProducer<>(myConfig)) {
-				// it suffices to call initTransactions - this will abort any lingering transactions
-				kafkaProducer.initTransactions();
+			// The parallelStream executes the consumer in a separated thread pool.
+			// Because the consumer(e.g. Kafka) uses the context classloader to construct some class
+			// we should set the correct classloader for it.
+			try (TemporaryClassLoaderContext ignored = new TemporaryClassLoaderContext(classLoader)) {
+				// don't mess with the original configuration or any other properties of the
+				// original object
+				// -> create an internal kafka producer on our own and do not rely on
+				//    initTransactionalProducer().
+				final Properties myConfig = new Properties();
+				myConfig.putAll(producerConfig);
+				initTransactionalProducerConfig(myConfig, transactionalId);
+				try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
+						new FlinkKafkaInternalProducer<>(myConfig)) {
+					// it suffices to call initTransactions - this will abort any lingering transactions
+					kafkaProducer.initTransactions();
+				}
 			}
 		});
 	}