You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by wangshuai <tu...@163.com> on 2022/12/13 08:02:11 UTC

Flink 中的类加载KafkaSerializerWrapper

在使用kafka自定义序列化时会导致对应的class无法加载的问题。通过分析,现有代码在使用AppClassLoader类加载器先加载了KafkaSerializerWrapper时,用户提交任务自己编写的类是无法通过AppClassLoader加载的,通过FlinkUserCodeClassLoader加载的话需要KafkaSerializerWrapper也是通过FlinkUserCodeClassLoader加载。
public void open(InitializationContext context) throws Exception {
final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
    try (TemporaryClassLoaderContext ignored =
            TemporaryClassLoaderContext.of(userCodeClassLoader)) {
serializer =
                InstantiationUtil.instantiate(
serializerClass.getName(),
Serializer.class,
getClass().getClassLoader()); // ?? 似乎应该如此 Thread.currentThread().getContextClassLoader()

if (serializer instanceof Configurable) {
            ((Configurable) serializer).configure(config);
} else {
serializer.configure(config, isKey);
}
    } catch (Exception e) {
throw new IOException("Failed to instantiate the serializer of class " + serializer, e);
}
}

Re: Flink 中的类加载KafkaSerializerWrapper

Posted by Martijn Visser <ma...@apache.org>.
Hi,

Please post to the Dev mailing list in English.

Best regards,

Martijn

On Tue, Dec 13, 2022 at 9:03 AM wangshuai <tu...@163.com> wrote:

>
> 在使用kafka自定义序列化时会导致对应的class无法加载的问题。通过分析,现有代码在使用AppClassLoader类加载器先加载了KafkaSerializerWrapper时,用户提交任务自己编写的类是无法通过AppClassLoader加载的,通过FlinkUserCodeClassLoader加载的话需要KafkaSerializerWrapper也是通过FlinkUserCodeClassLoader加载。
> public void open(InitializationContext context) throws Exception {
> final ClassLoader userCodeClassLoader =
> context.getUserCodeClassLoader().asClassLoader();
>     try (TemporaryClassLoaderContext ignored =
>             TemporaryClassLoaderContext.of(userCodeClassLoader)) {
> serializer =
>                 InstantiationUtil.instantiate(
> serializerClass.getName(),
> Serializer.class,
> getClass().getClassLoader()); // ?? 似乎应该如此
> Thread.currentThread().getContextClassLoader()
>
> if (serializer instanceof Configurable) {
>             ((Configurable) serializer).configure(config);
> } else {
> serializer.configure(config, isKey);
> }
>     } catch (Exception e) {
> throw new IOException("Failed to instantiate the serializer of class " +
> serializer, e);
> }
> }