You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2021/04/01 05:31:00 UTC
[jira] [Created] (FLINK-22082) Nested projection push down doesn't
work for data: row(array(row))
Dian Fu created FLINK-22082:
-------------------------------
Summary: Nested projection push down doesn't work for data: row(array(row))
Key: FLINK-22082
URL: https://issues.apache.org/jira/browse/FLINK-22082
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.12.0, 1.13.0
Reporter: Dian Fu
For the following job:
{code}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, StreamTableEnvironment
config = TableConfig()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env, config)
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'filesystem',
'format' = 'json',
'path' = '/tmp/1.txt'
)
"""
sink_ddl = """
CREATE TABLE OutTable (
`ID` STRING,
`value` BIGINT
) WITH (
'connector' = 'print'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
table = t_env.from_path('InTable')
table \
.select(
table.ID,
table.Result.data.at(1).value) \
.execute_insert('OutTable') \
.wait()
{code}
It will thrown the following exception:
{code}
: scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall)
at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
{code}
See https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array for more details
--
This message was sent by Atlassian Jira
(v8.3.4#803005)