You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Daniel Henneberger <he...@gmail.com> on 2023/02/21 21:35:02 UTC

Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API

Dear Apache Flink community,

I could use some help with a serialization issue I'm having while using the
Table API. Specifically, I'm trying to deserialize a serialized
CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
It seems that the CompilePlan deserializer isn't looking up any functions
in the BuiltInFunctionDefinitions class, which is causing the
de-serialization to fail.

Do any of you have experience with this issue or know of a workaround for
serializing a Table API plan?

Below is code to replicate.

Thanks,
Daniel Henneberger

private void test() {
  EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
  TableEnvironment tEnv = TableEnvironment.create(settings);

  // Create a table of values
  Table table = tEnv.fromValues(createNestedDatatype(),
      Row.of(List.of(Row.of("nested")), "name"));
  tEnv.createTemporaryView("table1", table);

  // Invoke the unnest operation
  Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
      + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");

  StatementSet statementSet = tEnv.createStatementSet();
  statementSet.addInsert(TableDescriptor.forConnector("print").build(),
unnested);

  // Serialize the plan
  CompiledPlan plan = statementSet.compilePlan();
  String json = plan.asJsonString();

  // Attempt to load the plan
  // This fails with the error 'Could not resolve internal system
function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
  CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
  plan2.execute().print();
}

private DataType createNestedDatatype() {
  return DataTypes.ROW(
      DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
          DataTypes.FIELD("nested", DataTypes.STRING())
      ))),
      DataTypes.FIELD("name", DataTypes.STRING()));
}

Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API

Posted by Jane Chan <qi...@gmail.com>.
Hi Daniel,

Thanks for reporting this issue. According to the FLIP [1], this should be
a bug, and I've created a Jira ticket [2] to track this.

> We will introduce a declarative concept to `BuiltInFunctionDefinitions`
> and `FlinkSqlOperatorTable` that maintain a function name + version to
> instance mapping.
>

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-FunctionVersioning
[2] https://issues.apache.org/jira/browse/FLINK-31182

Best,
Jane

On Wed, Feb 22, 2023 at 9:55 AM yuxia <lu...@alumni.sjtu.edu.cn> wrote:

> Hi, Daniel Henneberger.
> Thanks for reporting.  It seems a bug to me. Could you please help create
> a Jira[1] for it?
> As a workaround, is it possible not to use UNNEST? May be you can try to
> use EXPLODE function for the Flink planner will rewrites UNNEST to explode
> function
> in implementation[2].
>
> [1] https://issues.apache.org/jira/projects/FLINK/issues/
> [2]
> https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Daniel Henneberger" <he...@gmail.com>
> *收件人: *"User" <us...@flink.apache.org>
> *发送时间: *星期三, 2023年 2 月 22日 上午 5:35:02
> *主题: *Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
>
> Dear Apache Flink community,
>
> I could use some help with a serialization issue I'm having while using
> the Table API. Specifically, I'm trying to deserialize a serialized
> CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
> It seems that the CompilePlan deserializer isn't looking up any functions
> in the BuiltInFunctionDefinitions class, which is causing the
> de-serialization to fail.
>
> Do any of you have experience with this issue or know of a workaround for
> serializing a Table API plan?
>
> Below is code to replicate.
>
> Thanks,
> Daniel Henneberger
>
> private void test() {
>   EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
>   TableEnvironment tEnv = TableEnvironment.create(settings);
>
>   // Create a table of values
>   Table table = tEnv.fromValues(createNestedDatatype(),
>       Row.of(List.of(Row.of("nested")), "name"));
>   tEnv.createTemporaryView("table1", table);
>
>   // Invoke the unnest operation
>   Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
>       + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");
>
>   StatementSet statementSet = tEnv.createStatementSet();
>   statementSet.addInsert(TableDescriptor.forConnector("print").build(), unnested);
>
>   // Serialize the plan
>   CompiledPlan plan = statementSet.compilePlan();
>   String json = plan.asJsonString();
>
>   // Attempt to load the plan
>   // This fails with the error 'Could not resolve internal system function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
>   CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
>   plan2.execute().print();
> }
>
> private DataType createNestedDatatype() {
>   return DataTypes.ROW(
>       DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
>           DataTypes.FIELD("nested", DataTypes.STRING())
>       ))),
>       DataTypes.FIELD("name", DataTypes.STRING()));
> }
>
>
>

Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Daniel Henneberger. 
Thanks for reporting. It seems a bug to me. Could you please help create a Jira[1] for it? 
As a workaround, is it possible not to use UNNEST? May be you can try to use EXPLODE function for the Flink planner will rewrites UNNEST to explode function 
in implementation[2]. 

[1] https://issues.apache.org/jira/projects/FLINK/issues/ 
[2] https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala 

Best regards, 
Yuxia 


发件人: "Daniel Henneberger" <he...@gmail.com> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期三, 2023年 2 月 22日 上午 5:35:02 
主题: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API 

Dear Apache Flink community, 

I could use some help with a serialization issue I'm having while using the Table API. Specifically, I'm trying to deserialize a serialized CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation. It seems that the CompilePlan deserializer isn't looking up any functions in the BuiltInFunctionDefinitions class, which is causing the de-serialization to fail. 

Do any of you have experience with this issue or know of a workaround for serializing a Table API plan? 

Below is code to replicate. 

Thanks, 
Daniel Henneberger 

private void test () { 
EnvironmentSettings settings = EnvironmentSettings . newInstance ().inStreamingMode().build(); 
TableEnvironment tEnv = TableEnvironment . create ( settings ); 

// Create a table of values 
Table table = tEnv .fromValues(createNestedDatatype(), 
Row . of ( List . of ( Row . of ( "nested" )), "name" )); 
tEnv .createTemporaryView( "table1" , table ); 

// Invoke the unnest operation 
Table unnested = tEnv .sqlQuery( "SELECT name, nested \n " 
+ "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)" ); 

StatementSet statementSet = tEnv .createStatementSet(); 
statementSet .addInsert( TableDescriptor . forConnector ( "print" ).build(), unnested ); 

// Serialize the plan 
CompiledPlan plan = statementSet .compilePlan(); 
String json = plan .asJsonString(); 

// Attempt to load the plan 
// This fails with the error 'Could not resolve internal system function '$UNNEST_ROWS$1'. This is a bug, please file an issue.' 
CompiledPlan plan2 = tEnv .loadPlan( PlanReference . fromJsonString ( json )); 
plan2 .execute().print(); 
} 

private DataType createNestedDatatype () { 
return DataTypes . ROW ( 
DataTypes . FIELD ( "arr" , DataTypes . ARRAY ( DataTypes . ROW ( 
DataTypes . FIELD ( "nested" , DataTypes . STRING ()) 
))), 
DataTypes . FIELD ( "name" , DataTypes . STRING ())); 
}