You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2021/04/01 05:31:00 UTC

[jira] [Created] (FLINK-22082) Nested projection push down doesn't work for data: row(array(row))

Dian Fu created FLINK-22082:
-------------------------------

             Summary: Nested projection push down doesn't work for data: row(array(row))
                 Key: FLINK-22082
                 URL: https://issues.apache.org/jira/browse/FLINK-22082
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.12.0, 1.13.0
            Reporter: Dian Fu


For the following job:

{code}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, StreamTableEnvironment

config = TableConfig()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env, config)

source_ddl = """
    CREATE TABLE InTable (
        `ID` STRING,
        `Timestamp` TIMESTAMP(3),
        `Result` ROW(
            `data` ROW(`value` BIGINT) ARRAY),
        WATERMARK FOR `Timestamp` AS `Timestamp`
    ) WITH (
        'connector' = 'filesystem',
        'format' = 'json',
        'path' = '/tmp/1.txt'
    )
"""

sink_ddl = """
    CREATE TABLE OutTable (
        `ID` STRING,
        `value` BIGINT
    ) WITH (
        'connector' = 'print'
    )
"""

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

table = t_env.from_path('InTable')
table \
    .select(
        table.ID,
        table.Result.data.at(1).value) \
    .execute_insert('OutTable') \
    .wait()
{code}

It will thrown the following exception:
{code}
: scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall)
	at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
	at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
	at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
	at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
	at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
	at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
{code}

See https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array for more details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)