You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hongbo (Jira)" <ji...@apache.org> on 2022/07/26 12:55:00 UTC

[jira] [Created] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

Hongbo created FLINK-28693:
------------------------------

             Summary: Codegen failed if the watermark is defined on a columnByExpression
                 Key: FLINK-28693
                 URL: https://issues.apache.org/jira/browse/FLINK-28693
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.15.1
            Reporter: Hongbo


The following code will throw an exception:

```
{color:#000000} {color}{color:#008080}Table{color}{color:#000000} program cannot be compiled. {color}{color:#0000ff}This{color}{color:#000000} is a bug. {color}{color:#008080}Please{color}{color:#000000} file an issue.{color}
{color:#000000} ...{color}
{color:#008080} Caused{color}{color:#000000} {color}{color:#0000ff}by{color}{color:#000000}: org.codehaus.commons.compiler.{color}{color:#008080}CompileException{color}{color:#000000}: {color}{color:#008080}Line{color}{color:#000000} {color}{color:#098658}29{color}{color:#000000}, {color}{color:#008080}Column{color}{color:#000000} {color}{color:#098658}54{color}{color:#000000}: {color}{color:#008080}Cannot{color}{color:#000000} determine simple {color}{color:#0000ff}type{color}{color:#000000} name {color}{color:#a31515}"org"{color}
{color:#000000}```{color}
{color:#000000}Code:{color}
{color:#000000}```{color}
{color:#cc7832}public class {color}TestUdf {color:#cc7832}extends {color}ScalarFunction {
{color:#bbb529}@DataTypeHint{color}({color:#6a8759}"TIMESTAMP(3)"{color})
{color:#cc7832}public {color}LocalDateTime {color:#ffc66d}eval{color}(String strDate) {
{color:#cc7832}return {color}LocalDateTime.now(){color:#cc7832};
{color}{color:#cc7832} {color}}
}
{color:#cc7832}public{color}{color:#cc7832} class {color}FlinkTest {
{color:#bbb529}@Test
{color}{color:#bbb529} {color}{color:#cc7832}void {color}{color:#ffc66d}testUdf{color}() {color:#cc7832}throws {color}Exception {
{color:#808080}//var env = StreamExecutionEnvironment.createLocalEnvironment();
{color}{color:#808080} // run `gradlew shadowJar` first to generate the uber jar.
{color}{color:#808080} // It contains the kafka connector and a dummy UDF function.
{color}{color:#808080}
{color}{color:#808080} {color}{color:#cc7832}var {color}env = StreamExecutionEnvironment.createRemoteEnvironment({color:#6a8759}"localhost"{color}{color:#cc7832}, {color}{color:#6897bb}8081{color}{color:#cc7832},
{color}{color:#cc7832} {color}{color:#6a8759}"build/libs/flink-test-all.jar"{color}){color:#cc7832};
{color}{color:#cc7832} {color}env.setParallelism({color:#6897bb}1{color}){color:#cc7832};
{color}{color:#cc7832} var {color}tableEnv = StreamTableEnvironment.create(env){color:#cc7832};
{color}{color:#cc7832} {color}tableEnv.createTemporarySystemFunction({color:#6a8759}"TEST_UDF"{color}{color:#cc7832}, {color}TestUdf.{color:#cc7832}class{color}){color:#cc7832};
{color}{color:#cc7832}
{color}{color:#cc7832} var {color}testTable = tableEnv.from(TableDescriptor.forConnector({color:#6a8759}"kafka"{color})
.schema(Schema.newBuilder()
.column({color:#6a8759}"time_stamp"{color}{color:#cc7832}, {color}DataTypes.STRING())
.columnByExpression({color:#6a8759}"udf_ts"{color}{color:#cc7832}, {color}{color:#6a8759}"TEST_UDF(time_stamp)"{color})
.watermark({color:#6a8759}"udf_ts"{color}{color:#cc7832}, {color}{color:#6a8759}"udf_ts - INTERVAL '1' second"{color})
.build())
{color:#808080}// the kafka server doesn't need to exist. It fails in the compile stage before fetching data.
{color}{color:#808080} {color}.option({color:#6a8759}"properties.bootstrap.servers"{color}{color:#cc7832}, {color}{color:#6a8759}"localhost:9092"{color})
.option({color:#6a8759}"topic"{color}{color:#cc7832}, {color}{color:#6a8759}"test_topic"{color})
.option({color:#6a8759}"format"{color}{color:#cc7832}, {color}{color:#6a8759}"json"{color})
.option({color:#6a8759}"scan.startup.mode"{color}{color:#cc7832}, {color}{color:#6a8759}"latest-offset"{color})
.build()){color:#cc7832};
{color}{color:#cc7832} {color}testTable.printSchema(){color:#cc7832};
{color}{color:#cc7832} {color}tableEnv.createTemporaryView({color:#6a8759}"test"{color}{color:#cc7832}, {color}testTable ){color:#cc7832};
{color}{color:#cc7832}
{color}{color:#cc7832} var {color}query = tableEnv.sqlQuery({color:#6a8759}"select * from test"{color}){color:#cc7832};
{color}{color:#cc7832} var {color}tableResult = query.executeInsert(TableDescriptor.forConnector({color:#6a8759}"print"{color}).build()){color:#cc7832};
{color}{color:#cc7832} {color}tableResult.await(){color:#cc7832};
{color}{color:#cc7832} {color}}
}
{color:#000000}```{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)