You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xintong Song (Jira)" <ji...@apache.org> on 2023/03/23 08:52:15 UTC

[jira] [Updated] (FLINK-28897) Fail to use udf in added jar when enabling checkpoint

     [ https://issues.apache.org/jira/browse/FLINK-28897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xintong Song updated FLINK-28897:
---------------------------------
    Fix Version/s: 1.18.0
                       (was: 1.17.0)

> Fail to use udf in added jar when enabling checkpoint
> -----------------------------------------------------
>
>                 Key: FLINK-28897
>                 URL: https://issues.apache.org/jira/browse/FLINK-28897
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: Liu
>            Assignee: dalongliu
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.16.2, 1.18.0
>
>
> The problem can be reproduced when enabling checkpoint for that StreamingJobGraphGenerator.preValidate is called actually in this case. Maybe this is a classloader problem.
> The reproduced steps are as following:
> {code:java}
> // Enable checkpoint first and execute the command in sql client.
> ADD JAR  '~/flink/flink-end-to-end-tests/flink-sql-client-test/target/SqlToolbox.jar';
> create function func1 as 'org.apache.flink.table.toolbox.StringRegexReplaceFunction' LANGUAGE JAVA;
> SELECT id, func1(str, 'World', 'Flink') FROM (VALUES (1, 'Hello World')) AS T(id, str); {code}
> The output is as following:
> {code:java}
> /* 1 */
> /* 2 */      public class StreamExecCalc$11 extends org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */          implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */        private final Object[] references;
> /* 6 */        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$4;
> /* 7 */
> /* 8 */        private final org.apache.flink.table.data.binary.BinaryStringData str$6 = org.apache.flink.table.data.binary.BinaryStringData.fromString("World");
> /* 9 */
> /* 10 */
> /* 11 */        private final org.apache.flink.table.data.binary.BinaryStringData str$7 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Flink");
> /* 12 */
> /* 13 */        private transient org.apache.flink.table.toolbox.StringRegexReplaceFunction function_org$apache$flink$table$toolbox$StringRegexReplaceFunction;
> /* 14 */        private transient org.apache.flink.table.data.conversion.StringStringConverter converter$8;
> /* 15 */        org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(2);
> /* 16 */        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 17 */
> /* 18 */        public StreamExecCalc$11(
> /* 19 */            Object[] references,
> /* 20 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 21 */            org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 22 */            org.apache.flink.streaming.api.operators.Output output,
> /* 23 */            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
> /* 24 */          this.references = references;
> /* 25 */          typeSerializer$4 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
> /* 26 */          function_org$apache$flink$table$toolbox$StringRegexReplaceFunction = (((org.apache.flink.table.toolbox.StringRegexReplaceFunction) references[1]));
> /* 27 */          converter$8 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[2]));
> /* 28 */          this.setup(task, config, output);
> /* 29 */          if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 30 */            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 31 */              .setProcessingTimeService(processingTimeService);
> /* 32 */          }
> /* 33 */        }
> /* 34 */
> /* 35 */        @Override
> /* 36 */        public void open() throws Exception {
> /* 37 */          super.open();
> /* 38 */
> /* 39 */          function_org$apache$flink$table$toolbox$StringRegexReplaceFunction.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
> /* 40 */
> /* 41 */
> /* 42 */          converter$8.open(getRuntimeContext().getUserCodeClassLoader());
> /* 43 */
> /* 44 */        }
> /* 45 */
> /* 46 */        @Override
> /* 47 */        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
> /* 48 */          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
> /* 49 */
> /* 50 */          int field$2;
> /* 51 */          boolean isNull$2;
> /* 52 */          org.apache.flink.table.data.binary.BinaryStringData field$3;
> /* 53 */          boolean isNull$3;
> /* 54 */          org.apache.flink.table.data.binary.BinaryStringData field$5;
> /* 55 */          java.lang.String externalResult$9;
> /* 56 */          org.apache.flink.table.data.binary.BinaryStringData result$10;
> /* 57 */          boolean isNull$10;
> /* 58 */
> /* 59 */
> /* 60 */          isNull$2 = in1.isNullAt(0);
> /* 61 */          field$2 = -1;
> /* 62 */          if (!isNull$2) {
> /* 63 */            field$2 = in1.getInt(0);
> /* 64 */          }
> /* 65 */
> /* 66 */          isNull$3 = in1.isNullAt(1);
> /* 67 */          field$3 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> /* 68 */          if (!isNull$3) {
> /* 69 */            field$3 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
> /* 70 */          }
> /* 71 */          field$5 = field$3;
> /* 72 */          if (!isNull$3) {
> /* 73 */            field$5 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$4.copy(field$5));
> /* 74 */          }
> /* 75 */
> /* 76 */
> /* 77 */          out.setRowKind(in1.getRowKind());
> /* 78 */
> /* 79 */
> /* 80 */
> /* 81 */
> /* 82 */          out.setInt(0, field$2);
> /* 83 */
> /* 84 */
> /* 85 */
> /* 86 */
> /* 87 */
> /* 88 */
> /* 89 */
> /* 90 */          externalResult$9 = (java.lang.String) function_org$apache$flink$table$toolbox$StringRegexReplaceFunction
> /* 91 */            .eval(isNull$3 ? null : ((java.lang.String) converter$8.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$5)), false ? null : ((java.lang.String) converter$8.toExternal((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$6))), false ? null : ((java.lang.String) converter$8.toExternal((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$7))));
> /* 92 */
> /* 93 */          isNull$10 = externalResult$9 == null;
> /* 94 */          result$10 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> /* 95 */          if (!isNull$10) {
> /* 96 */            result$10 = (org.apache.flink.table.data.binary.BinaryStringData) converter$8.toInternalOrNull((java.lang.String) externalResult$9);
> /* 97 */          }
> /* 98 */
> /* 99 */          if (isNull$10) {
> /* 100 */            out.setNullAt(1);
> /* 101 */          } else {
> /* 102 */            out.setNonPrimitiveValue(1, result$10);
> /* 103 */          }
> /* 104 */
> /* 105 */
> /* 106 */          output.collect(outElement.replace(out));
> /* 107 */
> /* 108 */
> /* 109 */        }
> /* 110 */
> /* 111 */
> /* 112 */
> /* 113 */        @Override
> /* 114 */        public void close() throws Exception {
> /* 115 */           super.close();
> /* 116 */
> /* 117 */          function_org$apache$flink$table$toolbox$StringRegexReplaceFunction.close();
> /* 118 */
> /* 119 */        }
> /* 120 */
> /* 121 */
> /* 122 */      }
> /* 123 *//* 1 */
> /* 2 */      public class StreamExecCalc$11 extends org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */          implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */        private final Object[] references;
> /* 6 */        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$4;
> /* 7 */
> /* 8 */        private final org.apache.flink.table.data.binary.BinaryStringData str$6 = org.apache.flink.table.data.binary.BinaryStringData.fromString("World");
> /* 9 */
> /* 10 */
> /* 11 */        private final org.apache.flink.table.data.binary.BinaryStringData str$7 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Flink");
> /* 12 */
> /* 13 */        private transient org.apache.flink.table.toolbox.StringRegexReplaceFunction function_org$apache$flink$table$toolbox$StringRegexReplaceFunction;
> /* 14 */        private transient org.apache.flink.table.data.conversion.StringStringConverter converter$8;
> /* 15 */        org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(2);
> /* 16 */        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 17 */
> /* 18 */        public StreamExecCalc$11(
> /* 19 */            Object[] references,
> /* 20 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 21 */            org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 22 */            org.apache.flink.streaming.api.operators.Output output,
> /* 23 */            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
> /* 24 */          this.references = references;
> /* 25 */          typeSerializer$4 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
> /* 26 */          function_org$apache$flink$table$toolbox$StringRegexReplaceFunction = (((org.apache.flink.table.toolbox.StringRegexReplaceFunction) references[1]));
> /* 27 */          converter$8 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[2]));
> /* 28 */          this.setup(task, config, output);
> /* 29 */          if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 30 */            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 31 */              .setProcessingTimeService(processingTimeService);
> /* 32 */          }
> /* 33 */        }
> /* 34 */
> /* 35 */        @Override
> /* 36 */        public void open() throws Exception {
> /* 37 */          super.open();
> /* 38 */
> /* 39 */          function_org$apache$flink$table$toolbox$StringRegexReplaceFunction.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
> /* 40 */
> /* 41 */
> /* 42 */          converter$8.open(getRuntimeContext().getUserCodeClassLoader());
> /* 43 */
> /* 44 */        }
> /* 45 */
> /* 46 */        @Override
> /* 47 */        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
> /* 48 */          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
> /* 49 */
> /* 50 */          int field$2;
> /* 51 */          boolean isNull$2;
> /* 52 */          org.apache.flink.table.data.binary.BinaryStringData field$3;
> /* 53 */          boolean isNull$3;
> /* 54 */          org.apache.flink.table.data.binary.BinaryStringData field$5;
> /* 55 */          java.lang.String externalResult$9;
> /* 56 */          org.apache.flink.table.data.binary.BinaryStringData result$10;
> /* 57 */          boolean isNull$10;
> /* 58 */
> /* 59 */
> /* 60 */          isNull$2 = in1.isNullAt(0);
> /* 61 */          field$2 = -1;
> /* 62 */          if (!isNull$2) {
> /* 63 */            field$2 = in1.getInt(0);
> /* 64 */          }
> /* 65 */
> /* 66 */          isNull$3 = in1.isNullAt(1);
> /* 67 */          field$3 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> /* 68 */          if (!isNull$3) {
> /* 69 */            field$3 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
> /* 70 */          }
> /* 71 */          field$5 = field$3;
> /* 72 */          if (!isNull$3) {
> /* 73 */            field$5 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$4.copy(field$5));
> /* 74 */          }
> /* 75 */
> /* 76 */
> /* 77 */          out.setRowKind(in1.getRowKind());
> /* 78 */
> /* 79 */
> /* 80 */
> /* 81 */
> /* 82 */          out.setInt(0, field$2);
> /* 83 */
> /* 84 */
> /* 85 */
> /* 86 */
> /* 87 */
> /* 88 */
> /* 89 */
> /* 90 */          externalResult$9 = (java.lang.String) function_org$apache$flink$table$toolbox$StringRegexReplaceFunction
> /* 91 */            .eval(isNull$3 ? null : ((java.lang.String) converter$8.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$5)), false ? null : ((java.lang.String) converter$8.toExternal((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$6))), false ? null : ((java.lang.String) converter$8.toExternal((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$7))));
> /* 92 */
> /* 93 */          isNull$10 = externalResult$9 == null;
> /* 94 */          result$10 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> /* 95 */          if (!isNull$10) {
> /* 96 */            result$10 = (org.apache.flink.table.data.binary.BinaryStringData) converter$8.toInternalOrNull((java.lang.String) externalResult$9);
> /* 97 */          }
> /* 98 */
> /* 99 */          if (isNull$10) {
> /* 100 */            out.setNullAt(1);
> /* 101 */          } else {
> /* 102 */            out.setNonPrimitiveValue(1, result$10);
> /* 103 */          }
> /* 104 */
> /* 105 */
> /* 106 */          output.collect(outElement.replace(out));
> /* 107 */
> /* 108 */
> /* 109 */        }
> /* 110 */
> /* 111 */
> /* 112 */
> /* 113 */        @Override
> /* 114 */        public void close() throws Exception {
> /* 115 */           super.close();
> /* 116 */
> /* 117 */          function_org$apache$flink$table$toolbox$StringRegexReplaceFunction.close();
> /* 118 */
> /* 119 */        }
> /* 120 */
> /* 121 */
> /* 122 */      }
> /* 123 */[ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 13, Column 30: Cannot determine simple type name "org" {code}
> The log stack is as following:
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute SQL statement.
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:208) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:228) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:537) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:444) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.cli.CliClient.executeOperation(CliClient.java:371) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:328) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:279) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:227) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:896) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1375) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:206) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     ... 11 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
>     at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:120) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:462) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:205) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1016) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:71) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2199) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1375) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:206) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     ... 11 more
> Caused by: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:120) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:462) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:205) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1016) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:71) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2199) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1375) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:206) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     ... 11 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
>     at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:120) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:462) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:205) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1016) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:71) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2199) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1375) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:206) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     ... 11 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 13, Column 30: Cannot determine simple type name "org"
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3927) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$25.getType(UnitCompiler.java:8271) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6873) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$14400(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6499) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6494) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4310) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6494) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6469) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6855) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$14200(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6497) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6494) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4224) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6494) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6469) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9026) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5062) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:120) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:462) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:205) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1016) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:71) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2199) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1375) ~[flink-table-api-java-uber-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:206) ~[flink-sql-client-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>     ... 11 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)