You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by XiaChang <13...@163.com> on 2020/07/29 07:00:20 UTC

flink 1.11 CREATE VIEW + LATERAL TABLE 语法校验问题

您好,请教一个问题:
背景:
运行环境-->flink1.11.1
source_table-->kafka输入表,kafka中每一个行为一个json数组。([{},{}])
-- source
CREATE TABLE source_table(
    __message STRING
) WITH(
'connector.type' = 'kafka',
'connector.version' = 'universal-xx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.topic' = 'xxx',
'connector.startup-mode' = 'latest-offset',
'connector.properties.group.id' = 'xxx',
'format.type' = 'json-xx',
'format.derive-schema' = 'true'
);


XX_JSON_TUPLE-->自定义udtf
XX_ROW_VALUE-->自定义udf


可正常执行:

INSERT INTO sink_table
SELECT
cast(XX_ROW_VALUE(msg, 1) as bigint) AS bbb
,((cast(XX_ROW_VALUE(msg, 0) as bigint))/1000) AS ddd
,cast(XX_ROW_VALUE(msg, 0) as bigint) AS aaa
,XX_ROW_VALUE(msg, 8) AS ccc
FROM
source_table, LATERAL TABLE(XX_JSON_TUPLE(__message, 'aaa','bbb','ccc')) AS T(msg)


不可正常运行:
CREATE VIEW view_aaa AS
SELECT
cast(XX_ROW_VALUE(msg, 1) as bigint) AS bbb
,((cast(XX_ROW_VALUE(msg, 0) as bigint))/1000) AS ddd
,cast(XX_ROW_VALUE(msg, 0) as bigint) AS aaa
,XX_ROW_VALUE(msg, 8) AS ccc
FROM
source_table, LATERAL TABLE(XX_JSON_TUPLE(__message, 'aaa','bbb','ccc')) AS T(msg);
INSERT INTO sink_table
SELECT * FROM view_aaa;




相关错误堆栈信息:
Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 23 to line 3, column 33: Column '__message' not found in any table
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
at org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.douyu.ocean.xuanwu.flink.siddhi.SiddhiJob.lambda$callFrom$0(SiddhiJob.java:145)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.douyu.ocean.xuanwu.flink.siddhi.SiddhiJob.callFrom(SiddhiJob.java:141)
... 4 common frames omitted
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 23 to line 3, column 33: Column '__message' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237)
at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.$anonfun$getOperandType$1(UserDefinedFunctionUtils.scala:788)
at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.$anonfun$getOperandType$1$adapted(UserDefinedFunctionUtils.scala:787)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getOperandType(UserDefinedFunctionUtils.scala:787)
at org.apache.flink.table.planner.functions.utils.TableSqlFunction$.inferOperandTypesInternal(TableSqlFunction.scala:119)
at org.apache.flink.table.planner.functions.utils.TableSqlFunction$$anon$1.inferOperandTypes(TableSqlFunction.scala:107)
at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:56)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 27 common frames omitted
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column '__message' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 66 common frames omitted






Re: flink 1.11 CREATE VIEW + LATERAL TABLE 语法校验问题

Posted by Jark Wu <im...@gmail.com>.
Hi XiaChang,

Dev mailing list is used for discussing technical designs and proposals.
Please ask user questions in user@flink.apache.org or
user-zh@flink.apache.org mailing list.

Thanks,
Jark

On Wed, 29 Jul 2020 at 15:30, Wei Zhong <we...@gmail.com> wrote:

> Hi XiaChang,
>
> I think this is a bug. Others have encountered similar problems before.
> I have create a JIRA for it:
> https://issues.apache.org/jira/browse/FLINK-18750 <
> https://issues.apache.org/jira/browse/FLINK-18750>
>
> Best,
> Wei
>
> > 在 2020年7月29日,15:00,XiaChang <13...@163.com> 写道:
> >
> > 您好,请教一个问题:
> > 背景:
> > 运行环境-->flink1.11.1
> > source_table-->kafka输入表,kafka中每一个行为一个json数组。([{},{}])
> > -- source
> > CREATE TABLE source_table(
> >    __message STRING
> > ) WITH(
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal-xx',
> > 'connector.properties.bootstrap.servers' = 'xxx',
> > 'connector.topic' = 'xxx',
> > 'connector.startup-mode' = 'latest-offset',
> > 'connector.properties.group.id' = 'xxx',
> > 'format.type' = 'json-xx',
> > 'format.derive-schema' = 'true'
> > );
> >
> >
> > XX_JSON_TUPLE-->自定义udtf
> > XX_ROW_VALUE-->自定义udf
> >
> >
> > 可正常执行:
> >
> > INSERT INTO sink_table
> > SELECT
> > cast(XX_ROW_VALUE(msg, 1) as bigint) AS bbb
> > ,((cast(XX_ROW_VALUE(msg, 0) as bigint))/1000) AS ddd
> > ,cast(XX_ROW_VALUE(msg, 0) as bigint) AS aaa
> > ,XX_ROW_VALUE(msg, 8) AS ccc
> > FROM
> > source_table, LATERAL TABLE(XX_JSON_TUPLE(__message, 'aaa','bbb','ccc'))
> AS T(msg)
> >
> >
> > 不可正常运行:
> > CREATE VIEW view_aaa AS
> > SELECT
> > cast(XX_ROW_VALUE(msg, 1) as bigint) AS bbb
> > ,((cast(XX_ROW_VALUE(msg, 0) as bigint))/1000) AS ddd
> > ,cast(XX_ROW_VALUE(msg, 0) as bigint) AS aaa
> > ,XX_ROW_VALUE(msg, 8) AS ccc
> > FROM
> > source_table, LATERAL TABLE(XX_JSON_TUPLE(__message, 'aaa','bbb','ccc'))
> AS T(msg);
> > INSERT INTO sink_table
> > SELECT * FROM view_aaa;
> >
> >
> >
> >
> > 相关错误堆栈信息:
> > Caused by: org.apache.flink.table.api.ValidationException: SQL
> validation failed. From line 3, column 23 to line 3, column 33: Column
> '__message' not found in any table
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> > at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
> > at
> org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
> > at
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
> > at
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
> > at
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> > at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> > at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> > at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
> > at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
> > at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
> > at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> > at
> com.douyu.ocean.xuanwu.flink.siddhi.SiddhiJob.lambda$callFrom$0(SiddhiJob.java:145)
> > at java.util.ArrayList.forEach(ArrayList.java:1257)
> > at
> com.douyu.ocean.xuanwu.flink.siddhi.SiddhiJob.callFrom(SiddhiJob.java:141)
> > ... 4 common frames omitted
> > Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 3, column 23 to line 3, column 33: Column '__message' not found in any table
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> > at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
> > at
> org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237)
> > at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.$anonfun$getOperandType$1(UserDefinedFunctionUtils.scala:788)
> > at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.$anonfun$getOperandType$1$adapted(UserDefinedFunctionUtils.scala:787)
> > at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> > at scala.collection.immutable.Range.foreach(Range.scala:158)
> > at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> > at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> > at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getOperandType(UserDefinedFunctionUtils.scala:787)
> > at
> org.apache.flink.table.planner.functions.utils.TableSqlFunction$.inferOperandTypesInternal(TableSqlFunction.scala:119)
> > at
> org.apache.flink.table.planner.functions.utils.TableSqlFunction$$anon$1.inferOperandTypes(TableSqlFunction.scala:107)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
> > at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:56)
> > at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> > at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> > at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> > at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> > at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> > ... 27 common frames omitted
> > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> '__message' not found in any table
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> > ... 66 common frames omitted
> >
> >
> >
> >
> >
>
>

Re: flink 1.11 CREATE VIEW + LATERAL TABLE 语法校验问题

Posted by Wei Zhong <we...@gmail.com>.
Hi XiaChang,

I think this is a bug. Others have encountered similar problems before.
I have create a JIRA for it: https://issues.apache.org/jira/browse/FLINK-18750 <https://issues.apache.org/jira/browse/FLINK-18750>

Best,
Wei

> 在 2020年7月29日,15:00,XiaChang <13...@163.com> 写道:
> 
> 您好,请教一个问题:
> 背景:
> 运行环境-->flink1.11.1
> source_table-->kafka输入表,kafka中每一个行为一个json数组。([{},{}])
> -- source
> CREATE TABLE source_table(
>    __message STRING
> ) WITH(
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal-xx',
> 'connector.properties.bootstrap.servers' = 'xxx',
> 'connector.topic' = 'xxx',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.group.id' = 'xxx',
> 'format.type' = 'json-xx',
> 'format.derive-schema' = 'true'
> );
> 
> 
> XX_JSON_TUPLE-->自定义udtf
> XX_ROW_VALUE-->自定义udf
> 
> 
> 可正常执行:
> 
> INSERT INTO sink_table
> SELECT
> cast(XX_ROW_VALUE(msg, 1) as bigint) AS bbb
> ,((cast(XX_ROW_VALUE(msg, 0) as bigint))/1000) AS ddd
> ,cast(XX_ROW_VALUE(msg, 0) as bigint) AS aaa
> ,XX_ROW_VALUE(msg, 8) AS ccc
> FROM
> source_table, LATERAL TABLE(XX_JSON_TUPLE(__message, 'aaa','bbb','ccc')) AS T(msg)
> 
> 
> 不可正常运行:
> CREATE VIEW view_aaa AS
> SELECT
> cast(XX_ROW_VALUE(msg, 1) as bigint) AS bbb
> ,((cast(XX_ROW_VALUE(msg, 0) as bigint))/1000) AS ddd
> ,cast(XX_ROW_VALUE(msg, 0) as bigint) AS aaa
> ,XX_ROW_VALUE(msg, 8) AS ccc
> FROM
> source_table, LATERAL TABLE(XX_JSON_TUPLE(__message, 'aaa','bbb','ccc')) AS T(msg);
> INSERT INTO sink_table
> SELECT * FROM view_aaa;
> 
> 
> 
> 
> 相关错误堆栈信息:
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 23 to line 3, column 33: Column '__message' not found in any table
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
> at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
> at org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
> at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
> at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> at com.douyu.ocean.xuanwu.flink.siddhi.SiddhiJob.lambda$callFrom$0(SiddhiJob.java:145)
> at java.util.ArrayList.forEach(ArrayList.java:1257)
> at com.douyu.ocean.xuanwu.flink.siddhi.SiddhiJob.callFrom(SiddhiJob.java:141)
> ... 4 common frames omitted
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 23 to line 3, column 33: Column '__message' not found in any table
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
> at org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237)
> at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.$anonfun$getOperandType$1(UserDefinedFunctionUtils.scala:788)
> at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.$anonfun$getOperandType$1$adapted(UserDefinedFunctionUtils.scala:787)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.immutable.Range.foreach(Range.scala:158)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getOperandType(UserDefinedFunctionUtils.scala:787)
> at org.apache.flink.table.planner.functions.utils.TableSqlFunction$.inferOperandTypesInternal(TableSqlFunction.scala:119)
> at org.apache.flink.table.planner.functions.utils.TableSqlFunction$$anon$1.inferOperandTypes(TableSqlFunction.scala:107)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
> at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:56)
> at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> ... 27 common frames omitted
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column '__message' not found in any table
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> ... 66 common frames omitted
> 
> 
> 
> 
>