You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/28 13:23:29 UTC
[flink] branch master updated: [FLINK-16262][connectors/kafka] Set
the context classloader for parallel stream in FlinkKafkaProducer
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ff0d0c9 [FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer
ff0d0c9 is described below
commit ff0d0c979d7cf67648ecf91850e782e99d557240
Author: guowei.mgw <gu...@gmail.com>
AuthorDate: Sat Mar 28 21:22:57 2020 +0800
[FLINK-16262][connectors/kafka] Set the context classloader for parallel stream in FlinkKafkaProducer
This closes #11247
---
.../connectors/kafka/FlinkKafkaProducer.java | 29 ++++++++++++++--------
1 file changed, 18 insertions(+), 11 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3a92c81..9b42b5f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -1095,18 +1096,24 @@ public class FlinkKafkaProducer<IN>
// ----------------------------------- Utilities --------------------------
private void abortTransactions(final Set<String> transactionalIds) {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
transactionalIds.parallelStream().forEach(transactionalId -> {
- // don't mess with the original configuration or any other properties of the
- // original object
- // -> create an internal kafka producer on our own and do not rely on
- // initTransactionalProducer().
- final Properties myConfig = new Properties();
- myConfig.putAll(producerConfig);
- initTransactionalProducerConfig(myConfig, transactionalId);
- try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
- new FlinkKafkaInternalProducer<>(myConfig)) {
- // it suffices to call initTransactions - this will abort any lingering transactions
- kafkaProducer.initTransactions();
+ // The parallelStream executes the consumer in a separated thread pool.
+ // Because the consumer(e.g. Kafka) uses the context classloader to construct some class
+ // we should set the correct classloader for it.
+ try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+ // don't mess with the original configuration or any other properties of the
+ // original object
+ // -> create an internal kafka producer on our own and do not rely on
+ // initTransactionalProducer().
+ final Properties myConfig = new Properties();
+ myConfig.putAll(producerConfig);
+ initTransactionalProducerConfig(myConfig, transactionalId);
+ try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
+ new FlinkKafkaInternalProducer<>(myConfig)) {
+ // it suffices to call initTransactions - this will abort any lingering transactions
+ kafkaProducer.initTransactions();
+ }
}
});
}