You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Benoît Paris (Jira)" <ji...@apache.org> on 2020/01/27 12:28:00 UTC
[jira] [Updated] (FLINK-15775) SourceFunctions are opened twice
when pulled on from 2 Sinks
[ https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Benoît Paris updated FLINK-15775:
---------------------------------
Summary: SourceFunctions are opened twice when pulled on from 2 Sinks (was: SourceFunctions are instanciated twice when pulled on from 2 Sinks)
> SourceFunctions are opened twice when pulled on from 2 Sinks
> ------------------------------------------------------------
>
> Key: FLINK-15775
> URL: https://issues.apache.org/jira/browse/FLINK-15775
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.1, 1.10.0
> Reporter: Benoît Paris
> Priority: Major
> Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get instantiated twice; (and subsequently opened by the parallelism number, which is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK behavior, but not the processing we want):
>
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as we're missing half the inputs in each SysoSink):
>
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1");
> {code}
>
> This might not be a problem for Kafka's SourceFunctions, as we can always reread from a log; but it is a data loss problem when the source data can't be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, pom.xml, and logical execution plans for the duplicated case, and the workaround.
>
> ----
> Workaround: make a conversion to an appendStream. Somehow this makes the planner think it has to put a materialization barrier after the Source and read from that:
>
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
> tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>
>
> Best Regards,
> Ben
--
This message was sent by Atlassian Jira
(v8.3.4#803005)