You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangyue (Jira)" <ji...@apache.org> on 2022/11/25 02:07:00 UTC

[jira] [Created] (FLINK-30201) Function "unnest" can't process nesting JSON properly

zhangyue created FLINK-30201:
--------------------------------

             Summary: Function "unnest" can't process nesting JSON properly
                 Key: FLINK-30201
                 URL: https://issues.apache.org/jira/browse/FLINK-30201
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.16.0
            Reporter: zhangyue


Here is the CREATE TABLE DDL:
{code:java}
riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\
        `data` ROW<\
            `flow_id` STRING, \
            `flow_name` STRING, \
            `flow_version` STRING, \
            `risk_id` BIGINT, \
            `uid` BIGINT, \
            `is_pass` INT, \
            `result` INT, \
            `country_id` INT, \
            `business` STRING, \
            `engine_scene_id` STRING, \
            `flow_type` STRING, \
            `source` STRING, \
            `rule_results` ARRAY<ROW<`rule_id` STRING, \
                                `rule_name` STRING, \
                                `rule_type` STRING, \
                                `rule_type_name` STRING, \
                                `node_id` STRING, \
                                `result` INT, \
                                `policy_name` STRING, \
                                `in_path` BOOLEAN>>\
            >,\
        proctime as proctime()\
        ) WITH (\
            'connector' = 'kafka',\
            'topic' = 'riskRuleEngineResultLevel2_3',\
            'scan.startup.mode' = '%s',\
            'properties.bootstrap.servers' = '%s',\
            'properties.group.id' = '%s',\
            'format' = 'json'\
        ) {code}
flink sql:
{code:java}
String executeSql = "select data.flow_id as flow_id,t.rule_id,t.rule_name,t.rule_type,t.rule_type_name,t.node_id,t.`result` from riskRuleEngineResultLevel2_3, unnest(data.rule_results) as t (rule_id,rule_name,rule_type,rule_type_name,node_id,`result`,policy_name,in_path)"; {code}
  when the param in "unnest" Function is "data.rule_results" which is actually the right structure, the ERROR occurs as below. And when I use "rule_results" instead of "data.rule_results" in "unnest" Function ,It goes well. I think it is wired.
{code:java}
// Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
    at com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
    at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
    at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
    at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
    at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
    ... 5 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
    ... 28 more {code}
if I change DDL by adding "data2" which is the same structure with "data" at the same level, no matter I use "rule_results" or "data.rule_results" in "unnest" FUNCTION the ERROR occurs:
{code:java}
riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\
        `data` ROW<\
            `flow_id` STRING, \
            `flow_name` STRING, \
            `flow_version` STRING, \
            `risk_id` BIGINT, \
            `uid` BIGINT, \
            `is_pass` INT, \
            `result` INT, \
            `country_id` INT, \
            `business` STRING, \
            `engine_scene_id` STRING, \
            `flow_type` STRING, \
            `source` STRING, \
            `rule_results` ARRAY<ROW<`rule_id` STRING, \
                                `rule_name` STRING, \
                                `rule_type` STRING, \
                                `rule_type_name` STRING, \
                                `node_id` STRING, \
                                `result` INT, \
                                `policy_name` STRING, \
                                `in_path` BOOLEAN>>\
            >,\
        `data2` ROW<\
            `flow_id` STRING, \
            `flow_name` STRING, \
            `flow_version` STRING, \
            `risk_id` BIGINT, \
            `uid` BIGINT, \
            `is_pass` INT, \
            `result` INT, \
            `country_id` INT, \
            `business` STRING, \
            `engine_scene_id` STRING, \
            `flow_type` STRING, \
            `source` STRING, \
            `rule_results` ARRAY<ROW<`rule_id` STRING, \
                                `rule_name` STRING, \
                                `rule_type` STRING, \
                                `rule_type_name` STRING, \
                                `node_id` STRING, \
                                `result` INT, \
                                `policy_name` STRING, \
                                `in_path` BOOLEAN>>\
            >,\
        proctime as proctime()\
        ) WITH (\
            'connector' = 'kafka',\
            'topic' = 'riskRuleEngineResultLevel2_3',\
            'scan.startup.mode' = '%s',\
            'properties.bootstrap.servers' = '%s',\
            'properties.group.id' = '%s',\
            'format' = 'json'\
        ) {code}
ERROR when "rule_results" in "unnest":
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 146 to line 1, column 157: Column 'rule_results' is ambiguous
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
    at com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 146 to line 1, column 157: Column 'rule_results' is ambiguous
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
    at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:467)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2921)
    at org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:300)
    at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:419)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5404)
    at org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:64)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
    at org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122)
    at org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:69)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
    at org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:43)
    at org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
    at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:190)
    at org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:155)
    at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
    at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
    at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
    at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
    ... 5 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'rule_results' is ambiguous
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
    ... 46 more{code}
ERROR when "data.rule_results" in "unnest"
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
    at com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
    at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
    at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
    at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
    at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
    ... 5 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
    ... 28 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)