You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Daniel Henneberger <he...@gmail.com> on 2023/02/21 21:35:02 UTC
Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
Dear Apache Flink community,
I could use some help with a serialization issue I'm having while using the
Table API. Specifically, I'm trying to deserialize a serialized
CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
It seems that the CompilePlan deserializer isn't looking up any functions
in the BuiltInFunctionDefinitions class, which is causing the
de-serialization to fail.
Do any of you have experience with this issue or know of a workaround for
serializing a Table API plan?
Below is code to replicate.
Thanks,
Daniel Henneberger
private void test() {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Create a table of values
Table table = tEnv.fromValues(createNestedDatatype(),
Row.of(List.of(Row.of("nested")), "name"));
tEnv.createTemporaryView("table1", table);
// Invoke the unnest operation
Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
+ "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");
StatementSet statementSet = tEnv.createStatementSet();
statementSet.addInsert(TableDescriptor.forConnector("print").build(),
unnested);
// Serialize the plan
CompiledPlan plan = statementSet.compilePlan();
String json = plan.asJsonString();
// Attempt to load the plan
// This fails with the error 'Could not resolve internal system
function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
plan2.execute().print();
}
private DataType createNestedDatatype() {
return DataTypes.ROW(
DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
DataTypes.FIELD("nested", DataTypes.STRING())
))),
DataTypes.FIELD("name", DataTypes.STRING()));
}
Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
Posted by Jane Chan <qi...@gmail.com>.
Hi Daniel,
Thanks for reporting this issue. According to the FLIP [1], this should be
a bug, and I've created a Jira ticket [2] to track this.
> We will introduce a declarative concept to `BuiltInFunctionDefinitions`
> and `FlinkSqlOperatorTable` that maintain a function name + version to
> instance mapping.
>
[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-FunctionVersioning
[2] https://issues.apache.org/jira/browse/FLINK-31182
Best,
Jane
On Wed, Feb 22, 2023 at 9:55 AM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
> Hi, Daniel Henneberger.
> Thanks for reporting. It seems a bug to me. Could you please help create
> a Jira[1] for it?
> As a workaround, is it possible not to use UNNEST? May be you can try to
> use EXPLODE function for the Flink planner will rewrites UNNEST to explode
> function
> in implementation[2].
>
> [1] https://issues.apache.org/jira/projects/FLINK/issues/
> [2]
> https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Daniel Henneberger" <he...@gmail.com>
> *收件人: *"User" <us...@flink.apache.org>
> *发送时间: *星期三, 2023年 2 月 22日 上午 5:35:02
> *主题: *Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
>
> Dear Apache Flink community,
>
> I could use some help with a serialization issue I'm having while using
> the Table API. Specifically, I'm trying to deserialize a serialized
> CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
> It seems that the CompilePlan deserializer isn't looking up any functions
> in the BuiltInFunctionDefinitions class, which is causing the
> de-serialization to fail.
>
> Do any of you have experience with this issue or know of a workaround for
> serializing a Table API plan?
>
> Below is code to replicate.
>
> Thanks,
> Daniel Henneberger
>
> private void test() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> // Create a table of values
> Table table = tEnv.fromValues(createNestedDatatype(),
> Row.of(List.of(Row.of("nested")), "name"));
> tEnv.createTemporaryView("table1", table);
>
> // Invoke the unnest operation
> Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
> + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");
>
> StatementSet statementSet = tEnv.createStatementSet();
> statementSet.addInsert(TableDescriptor.forConnector("print").build(), unnested);
>
> // Serialize the plan
> CompiledPlan plan = statementSet.compilePlan();
> String json = plan.asJsonString();
>
> // Attempt to load the plan
> // This fails with the error 'Could not resolve internal system function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
> CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
> plan2.execute().print();
> }
>
> private DataType createNestedDatatype() {
> return DataTypes.ROW(
> DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
> DataTypes.FIELD("nested", DataTypes.STRING())
> ))),
> DataTypes.FIELD("name", DataTypes.STRING()));
> }
>
>
>
Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Daniel Henneberger.
Thanks for reporting. It seems a bug to me. Could you please help create a Jira[1] for it?
As a workaround, is it possible not to use UNNEST? May be you can try to use EXPLODE function for the Flink planner will rewrites UNNEST to explode function
in implementation[2].
[1] https://issues.apache.org/jira/projects/FLINK/issues/
[2] https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
Best regards,
Yuxia
发件人: "Daniel Henneberger" <he...@gmail.com>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期三, 2023年 2 月 22日 上午 5:35:02
主题: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
Dear Apache Flink community,
I could use some help with a serialization issue I'm having while using the Table API. Specifically, I'm trying to deserialize a serialized CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation. It seems that the CompilePlan deserializer isn't looking up any functions in the BuiltInFunctionDefinitions class, which is causing the de-serialization to fail.
Do any of you have experience with this issue or know of a workaround for serializing a Table API plan?
Below is code to replicate.
Thanks,
Daniel Henneberger
private void test () {
EnvironmentSettings settings = EnvironmentSettings . newInstance ().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment . create ( settings );
// Create a table of values
Table table = tEnv .fromValues(createNestedDatatype(),
Row . of ( List . of ( Row . of ( "nested" )), "name" ));
tEnv .createTemporaryView( "table1" , table );
// Invoke the unnest operation
Table unnested = tEnv .sqlQuery( "SELECT name, nested \n "
+ "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)" );
StatementSet statementSet = tEnv .createStatementSet();
statementSet .addInsert( TableDescriptor . forConnector ( "print" ).build(), unnested );
// Serialize the plan
CompiledPlan plan = statementSet .compilePlan();
String json = plan .asJsonString();
// Attempt to load the plan
// This fails with the error 'Could not resolve internal system function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
CompiledPlan plan2 = tEnv .loadPlan( PlanReference . fromJsonString ( json ));
plan2 .execute().print();
}
private DataType createNestedDatatype () {
return DataTypes . ROW (
DataTypes . FIELD ( "arr" , DataTypes . ARRAY ( DataTypes . ROW (
DataTypes . FIELD ( "nested" , DataTypes . STRING ())
))),
DataTypes . FIELD ( "name" , DataTypes . STRING ()));
}