You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "799590989@qq.com.INVALID" <79...@qq.com.INVALID> on 2022/04/21 02:50:22 UTC

使用自定义函数报入参填写字段名报Cannot load user class: com.example.udf.SubStr,填写字符串正常

环境信息

flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

自定义函数的代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {
  public String eval(String s, Integer start, Integer end) {
    return s.substring(start.intValue(), end.intValue());
  }
}
提交到集群的sql代码
[
"DROP TABLE IF EXISTS source_datagen",
"CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')",
"DROP TABLE IF EXISTS print_sink",
"CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS status,mysubstr(f_random_str,1,4) AS str FROM source_datagen"
]

controller的业务逻辑为

public String executeDefaultSql(String sql) throws Exception {
        log.info(sql);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port);
env.setStateBackend(new HashMapStateBackend());
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
List<String> jars = new ArrayList<>();
jars.add("hdfs:///xxx/file/function/flink/d78721345f45422da269fa0411127eda0453523812.jar");
try {
    for (String jar : jars) {
log.info(jar);
EnvUtil.loadJar(URLUtil.url(jar));
    }
    tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);
    tableEnv.executeSql("CREATE FUNCTION mysubstr AS 'com.example.udf.SubStr' LANGUAGE JAVA").print();
    log.info("完成加载hdfs上的udf");
}catch (Exception e){
    e.printStackTrace();
}
        List<String> list = JSON.parseArray(sql, String.class);
        TableResult result = null;
        for (String s : list) {
            result = tableEnv.executeSql(s);
        }
        String jobId = "";
        log.info(result.getResultKind().name());
        if (result.getJobClient().isPresent()) {
            log.info(JSON.toJSONString(result.getJobClient().get().getJobStatus()));
            jobId = result.getJobClient().get().getJobID().toString();
            log.info("jobId:"+jobId);
        }else{
            result.print();
        }
        return jobId;
}

报错信息为

2022-04-21 10:12:37
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.SubStr
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.SubStr
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    ... 10 more


如果将提交的SQL最后一段INSERT INTO 中自定义函数第一个入参改为自定义函数则正常打印
[
"DROP TABLE IF EXISTS source_datagen",
"CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')",
"DROP TABLE IF EXISTS print_sink",
"CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS status,mysubstr('ABCDEFGHIJK',1,4) AS str FROM source_datagen"
]

打印结果为

+I[1, 248, 2022-04-21 10:24:46.748, EDF]
+I[2, 718, 2022-04-21 10:24:46.749, EDF]
+I[3, 841, 2022-04-21 10:24:46.750, EDF]
+I[4, 788, 2022-04-21 10:24:46.750, EDF]
+I[5, 633, 2022-04-21 10:24:46.750, EDF]
+I[6, 524, 2022-04-21 10:24:47.742, EDF]
+I[7, 343, 2022-04-21 10:24:47.742, EDF]
+I[8, 391, 2022-04-21 10:24:47.742, EDF]
+I[9, 442, 2022-04-21 10:24:47.742, EDF]
+I[10, 992, 2022-04-21 10:24:47.742, EDF]
+I[11, 311, 2022-04-21 10:24:48.743, EDF]
+I[12, 567, 2022-04-21 10:24:48.743, EDF]
+I[13, 474, 2022-04-21 10:24:48.743, EDF]
+I[14, 796, 2022-04-21 10:24:48.743, EDF]
+I[15, 465, 2022-04-21 10:24:48.743, EDF]
+I[16, 945, 2022-04-21 10:24:49.742, EDF]
+I[17, 671, 2022-04-21 10:24:49.742, EDF]
+I[18, 154, 2022-04-21 10:24:49.743, EDF]
+I[19, 951, 2022-04-21 10:24:49.743, EDF]
+I[20, 619, 2022-04-21 10:24:49.743, EDF]
+I[21, 45, 2022-04-21 10:24:50.742, EDF]
+I[22, 577, 2022-04-21 10:24:50.742, EDF]
+I[23, 203, 2022-04-21 10:24:50.742, EDF]
+I[24, 472, 2022-04-21 10:24:50.742, EDF]
+I[25, 464, 2022-04-21 10:24:50.742, EDF]
+I[26, 36, 2022-04-21 10:24:51.743, EDF]
...

在Apache Flink Dashboard上看每次处理业务的Task Managers都是node02


799590989@qq.com