You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "799590989@qq.com.INVALID" <79...@qq.com.INVALID> on 2022/04/12 02:46:24 UTC

flinksql执行时提示自定义UDF无法加载的

环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

    public String eval(String s, Integer start,Integer end) {
        return s.substring(start,end);
    }
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf  jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id as id, user_id as user_id, status as status from (select id,user_id,status from data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
        Method method = null;
        try {
            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
        } catch (NoSuchMethodException | SecurityException e1) {
            e1.printStackTrace();
        }
        // 获取方法的访问权限
        boolean accessible = method.isAccessible();
        try {
            //修改访问权限为可写
            if (!accessible) {
                method.setAccessible(true);
            }
            // 获取系统类加载器
            URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
            //jar路径加入到系统url路径里
            method.invoke(classLoader, jarUrl);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            method.setAccessible(accessible);
        }
    }
    /**
     * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
     *
     * @param fsUrlStreamHandlerFactory
     * @throws Exception
     */
    public static void registerFactory(final FsUrlStreamHandlerFactory fsUrlStreamHandlerFactory)
            throws Exception {
        log.info("registerFactory : " + fsUrlStreamHandlerFactory.getClass().getName());
        final Field factoryField = URL.class.getDeclaredField("factory");
        factoryField.setAccessible(true);
        final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
        lockField.setAccessible(true);
        synchronized (lockField.get(null)) {
            final URLStreamHandlerFactory originalUrlStreamHandlerFactory = (URLStreamHandlerFactory) factoryField.get(null);
            factoryField.set(null, null);
            URL.setURLStreamHandlerFactory(protocol -> {
                if ("hdfs".equals(protocol)) {
                    return fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
                } else {
                    return originalUrlStreamHandlerFactory.createURLStreamHandler(protocol);
                }
            });
        }
    }



799590989@qq.com

Re: flinksql执行时提示自定义UDF无法加载的

Posted by Zhanghao Chen <zh...@outlook.com>.
你好,可以贴下客户端的具体提交命令吗?

Best,
Zhanghao Chen
________________________________
From: 799590989@qq.com.INVALID <79...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 10:46
To: user-zh <us...@flink.apache.org>
Subject: flinksql执行时提示自定义UDF无法加载的

环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

    public String eval(String s, Integer start,Integer end) {
        return s.substring(start,end);
    }
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf  jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id as id, user_id as user_id, status as status from (select id,user_id,status from data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
        Method method = null;
        try {
            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
        } catch (NoSuchMethodException | SecurityException e1) {
            e1.printStackTrace();
        }
        // 获取方法的访问权限
        boolean accessible = method.isAccessible();
        try {
            //修改访问权限为可写
            if (!accessible) {
                method.setAccessible(true);
            }
            // 获取系统类加载器
            URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
            //jar路径加入到系统url路径里
            method.invoke(classLoader, jarUrl);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            method.setAccessible(accessible);
        }
    }
    /**
     * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
     *
     * @param fsUrlStreamHandlerFactory
     * @throws Exception
     */
    public static void registerFactory(final FsUrlStreamHandlerFactory fsUrlStreamHandlerFactory)
            throws Exception {
        log.info("registerFactory : " + fsUrlStreamHandlerFactory.getClass().getName());
        final Field factoryField = URL.class.getDeclaredField("factory");
        factoryField.setAccessible(true);
        final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
        lockField.setAccessible(true);
        synchronized (lockField.get(null)) {
            final URLStreamHandlerFactory originalUrlStreamHandlerFactory = (URLStreamHandlerFactory) factoryField.get(null);
            factoryField.set(null, null);
            URL.setURLStreamHandlerFactory(protocol -> {
                if ("hdfs".equals(protocol)) {
                    return fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
                } else {
                    return originalUrlStreamHandlerFactory.createURLStreamHandler(protocol);
                }
            });
        }
    }



799590989@qq.com