You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hailong wang (Jira)" <ji...@apache.org> on 2020/05/23 05:35:00 UTC

[jira] [Comment Edited] (FLINK-17892) Dynamic option may not be a part of the table digest

    [ https://issues.apache.org/jira/browse/FLINK-17892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17114547#comment-17114547 ] 

hailong wang edited comment on FLINK-17892 at 5/23/20, 5:34 AM:
----------------------------------------------------------------

Hi [~lzljs3620320], No, kafka source will not be reused.

For the following sql: 
{code:java}
val sql1 = "CREATE TABLE SS (" +
   " a int," +
   " b bigint," +
   " c varchar" +
  ") WITH (" +
  "'connector.type' = 'TestTableSource' "+
  ")"
util.tableEnv.sqlUpdate(sql1);
util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())

val sqlQuery =
  """
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k1' = 'v1') */ WHERE a > 10)
    |UNION ALL
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k2' = 'v2') */ WHERE a > 10)
  """.stripMargin
util.verifyPlan(sqlQuery)
{code}
The result plan is :
{code:java}
Union(all=[true], union=[a, EXPR$1])
:- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, source: [TestTableSource(a, b, c)], dynamic options: {k1=v1}]], fields=[a, b, c])
+- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, source: [TestTableSource(a, b, c)], dynamic options: {k2=v2}]], fields=[a, b, c])
{code}
For the dynamic options is a part of table digest. It is not reused of source.

But I think dynamic options  is used to  override table properties, and table properties is not a part of  table digest, so dynamic options may be also not be a part of table digest.

What I'm looking forward is source can be reused Even if the table hint is different.

For another example,

If the source is kafka, whether source is reused or not will generate different results.

If the kafka source is reused, tableSink and tableSink1 will hava a full set of data at the same time from source.

But if the kafka source is not reused, tableSink and tableSink1 will have a full set of data together.

I think the first case will be correct.

 


was (Author: hailong wang):
Hi [~lzljs3620320], No, kafka source will not be reused.

For the following sql:

 
{code:java}
val sql1 = "CREATE TABLE SS (" +
   " a int," +
   " b bigint," +
   " c varchar" +
  ") WITH (" +
  "'connector.type' = 'TestTableSource' "+
  ")"
util.tableEnv.sqlUpdate(sql1);
util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())

val sqlQuery =
  """
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k1' = 'v1') */ WHERE a > 10)
    |UNION ALL
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k2' = 'v2') */ WHERE a > 10)
  """.stripMargin
util.verifyPlan(sqlQuery)
{code}
The result plan is :

 

 
{code:java}
Union(all=[true], union=[a, EXPR$1])
:- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, source: [TestTableSource(a, b, c)], dynamic options: {k1=v1}]], fields=[a, b, c])
+- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, source: [TestTableSource(a, b, c)], dynamic options: {k2=v2}]], fields=[a, b, c])
{code}
For the dynamic options is a part of table digest. It is not reused of source.

 

But I think dynamic options  is used to  override table properties, and table properties is not a part of  table digest, so dynamic options may be also not be a part of table digest.

What I'm looking forward is source can be reused Even if the table hint is different.

For another example,

If the source is kafka, whether source is reused or not will generate different results.

If the kafka source is reused, tableSink and tableSink1 will hava a full set of data at the same time from source.

But if the kafka source is not reused, tableSink and tableSink1 will have a full set of data together.

I think the first case will be correct.

 

> Dynamic option may not be a part of the table digest
> ----------------------------------------------------
>
>                 Key: FLINK-17892
>                 URL: https://issues.apache.org/jira/browse/FLINK-17892
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0
>            Reporter: hailong wang
>            Priority: Critical
>             Fix For: 1.11.0
>
>
> For now, Table properties not be a part of table digest, but dynamic option will be included.
> This will lead to an error when plan reused.
> if I defines a kafka table:
> {code:java}
> CREATE TABLE KAFKA (
> ……
> ) with (
> topic = 'xx',
> groupid = 'xxx'
> ……
> )
> Insert into sinktable select * from KAFKA;
> Insert into sinktable1 select * from KAFKA;{code}
> KAFKA source will be reused according to the SQL above.
> But if i add different table hint to dml, like:
> {code:java}
> Insert into sinktable select * from KAFKA /*+ OPTIONS('k1' = 'v1')*/;
> Insert into sinktable1 select * from KAFKA /*+ OPTIONS('k2' = 'v2')*/;
> {code}
> There will be two kafka tableSources  use the same groupid to  consumer the same topic.
> So I think dynamic option may not be a part of the table digest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)