You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Benoît Paris (Jira)" <ji...@apache.org> on 2020/01/27 12:28:00 UTC

[jira] [Updated] (FLINK-15775) SourceFunctions are opened twice when pulled on from 2 Sinks

     [ https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Benoît Paris updated FLINK-15775:
---------------------------------
    Summary: SourceFunctions are opened twice when pulled on from 2 Sinks  (was: SourceFunctions are instanciated twice when pulled on from 2 Sinks)

> SourceFunctions are opened twice when pulled on from 2 Sinks
> ------------------------------------------------------------
>
>                 Key: FLINK-15775
>                 URL: https://issues.apache.org/jira/browse/FLINK-15775
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.1, 1.10.0
>            Reporter: Benoît Paris
>            Priority: Major
>         Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get instantiated twice; (and subsequently opened by the parallelism number, which is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK behavior, but not the processing we want):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>  
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as we're missing half the inputs in each SysoSink):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1"); 
> {code}
>  
> This might not be a problem for Kafka's SourceFunctions, as we can always reread from a log; but it is a data loss problem when the source data can't be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, pom.xml, and logical execution plans for the duplicated case, and the workaround.
>  
> ----
> Workaround: make a conversion to an appendStream. Somehow this makes the planner think it has to put a materialization barrier after the Source and read from that:
>  
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>  tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>  
>  
> Best Regards,
> Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)