You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wangsan <wa...@163.com> on 2018/08/21 13:04:55 UTC
Side effect of DataStreamRel#translateToPlan
Hi all,
I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
Should we eliminate the side effect of DataStreamRel#translateToPlan ?
Best, Wangsan
appendix
tenv.registerTableSource("test_source", sourceTable)
val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))
implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.
== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Source
content : collect elements with CollectionInputFormat
Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 9 : Operator
content : Map
ship_strategy : FORWARD
Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD
Stage 11 : Data Source
content : collect elements with CollectionInputFormat
Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD
Re: Side effect of DataStreamRel#translateToPlan
Posted by Hequn Cheng <ch...@gmail.com>.
Hi wangsan,
> why don’t we make the DataStreamRel#translateToPlan always return the
same DataSteam as it is called the first time. What do you think?
You are definitely right. It would be nice if we support multi sink.
The reason for this problem is that flink now use calcite to parse and
optimize the sql while calcite don't support multi sink, so each time
insertInto be called, the whole sql is treated as a new one. I think the
community is already trying to solve the problem and probably be solved
soon.
Best, Hequn
On Wed, Aug 22, 2018 at 4:15 PM wangsan <wa...@163.com> wrote:
> Hi Timo,
>
> What confused me is why we need to rebuild the pipeline each time we try
> to convert the RelNode into datastream.
>
> In case I have this code (I want to write the query result into two target
> sinks) :
>
> val t = tenv.sqlQuery("SELECT * from test_source")
> t.insertInto("sink_1")
> t.insertInto("sink_2")
>
> And I got execution plan link this,
> So we have two source operators, and they are just the same. But in some
> cases, when the backend connector does not support multiple consumers (eg,
> MQ that only deliver message once), one of the two target sink may not
> receive all the records (and that's the problem I meet :( ).
>
> I thought the behavior of the above code should be equivalent to this,
>
> val t = tenv.sqlQuery("SELECT * from test_source")
> val stream = tenv.toAppendStream[Row](t)
> stream.writeAsText("path_1")
> stream.writeAsText("path_2”)
>
>
> So, IMO, Instead of return a totally new one, why don’t we make the
> DataStreamRel#translateToPlan always return the same DataSteam as it is
> called the first time. What do you think?
>
> Best,
> wangsan
>
>
>
> On Aug 22, 2018, at 12:03 AM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Wangsan,
>
> the bahavior of DataStreamRel#translateToPlan is more or less intended.
> That's why you call `toAppendStream` on the table environment. Because you
> add your pipeline to the environment (from source to current operator).
>
> However, the explain() method should not cause those side-effects.
>
> Regards,
> Timo
>
> Am 21.08.18 um 17:29 schrieb wangsan:
>
> Hi Timo,
>
> I think this may not only affect explain() method. Method
> DataStreamRel#translateToPlan is called when we need translate a
> FlinkRelNode into DataStream or DataSet, we add desired operators in
> execution environment. By side effect, I mean that if we call
> DataStreamRel#translateToPlan on same RelNode several times, the same
> operators are added in execution environment more than once, but actually
> we need that for only one time. Correct me if I misunderstood that.
>
> I will open an issue late this day, if this is indeed a problem.
>
> Best,
> wangsan
>
>
>
> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>
> Hi,
>
> this sounds like a bug to me. Maybe the explain() method is not
> implemented correctly. Can you open an issue for it in Jira?
>
> Thanks,
> Timo
>
>
> Am 21.08.18 um 15:04 schrieb wangsan:
>
> Hi all,
>
> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and
> that may cause the execution plan not as what we expected. Every time we
> call DataStreamRel#translateToPlan (in TableEnvirnment#explain,
> TableEnvirnment#writeToSink, etc), we add same operators in execution
> environment repeatedly.
>
> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>
> Best, Wangsan
>
> appendix
>
> tenv.registerTableSource("test_source", sourceTable)
>
> val t = tenv.sqlQuery("SELECT * from test_source")
> println(tenv.explain(t))
> println(tenv.explain(t))
>
> implicit val typeInfo = TypeInformation.of(classOf[Row])
> tenv.toAppendStream(t)
> println(tenv.explain(t))
> We call explain three times, and the Physical Execution Plan are all
> diffrent.
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
> LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
> source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 2 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 3 : Operator
> content : Map
> ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
> LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
> source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 2 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 3 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 4 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 5 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 6 : Operator
> content : Map
> ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
> LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
> source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 2 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 3 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 4 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 5 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 6 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 7 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 8 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 9 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 10 : Operator
> content : to: Row
> ship_strategy : FORWARD
>
> Stage 11 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 12 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 13 : Operator
> content : Map
> ship_strategy : FORWARD
>
>
>
>
>
Re: Side effect of DataStreamRel#translateToPlan
Posted by wangsan <wa...@163.com>.
Hi Timo,
What confused me is why we need to rebuild the pipeline each time we try to convert the RelNode into datastream.
In case I have this code (I want to write the query result into two target sinks) :
val t = tenv.sqlQuery("SELECT * from test_source")
t.insertInto("sink_1")
t.insertInto("sink_2")
And I got execution plan link this,
So we have two source operators, and they are just the same. But in some cases, when the backend connector does not support multiple consumers (eg, MQ that only deliver message once), one of the two target sink may not receive all the records (and that's the problem I meet :( ).
I thought the behavior of the above code should be equivalent to this,
val t = tenv.sqlQuery("SELECT * from test_source")
val stream = tenv.toAppendStream[Row](t)
stream.writeAsText("path_1")
stream.writeAsText("path_2”)
So, IMO, Instead of return a totally new one, why don’t we make the DataStreamRel#translateToPlan always return the same DataSteam as it is called the first time. What do you think?
Best,
wangsan
> On Aug 22, 2018, at 12:03 AM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Wangsan,
>
> the bahavior of DataStreamRel#translateToPlan is more or less intended. That's why you call `toAppendStream` on the table environment. Because you add your pipeline to the environment (from source to current operator).
>
> However, the explain() method should not cause those side-effects.
>
> Regards,
> Timo
>
> Am 21.08.18 um 17:29 schrieb wangsan:
>> Hi Timo,
>>
>> I think this may not only affect explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.
>>
>> I will open an issue late this day, if this is indeed a problem.
>>
>> Best,
>> wangsan
>>
>>
>>
>>> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>>>
>>> Hi,
>>>
>>> this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?
>>>
>>> Thanks,
>>> Timo
>>>
>>>
>>> Am 21.08.18 um 15:04 schrieb wangsan:
>>>> Hi all,
>>>>
>>>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>>>>
>>>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>>>>
>>>> Best, Wangsan
>>>>
>>>> appendix
>>>>
>>>> tenv.registerTableSource("test_source", sourceTable)
>>>>
>>>> val t = tenv.sqlQuery("SELECT * from test_source")
>>>> println(tenv.explain(t))
>>>> println(tenv.explain(t))
>>>>
>>>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>>>> tenv.toAppendStream(t)
>>>> println(tenv.explain(t))
>>>> We call explain three times, and the Physical Execution Plan are all diffrent.
>>>>
>>>> == Abstract Syntax Tree ==
>>>> LogicalProject(f1=[$0], f2=[$1])
>>>> LogicalTableScan(table=[[test_source]])
>>>>
>>>> == Optimized Logical Plan ==
>>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>>
>>>> == Physical Execution Plan ==
>>>> Stage 1 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 2 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 3 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>>
>>>> == Abstract Syntax Tree ==
>>>> LogicalProject(f1=[$0], f2=[$1])
>>>> LogicalTableScan(table=[[test_source]])
>>>>
>>>> == Optimized Logical Plan ==
>>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>>
>>>> == Physical Execution Plan ==
>>>> Stage 1 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 2 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 3 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 4 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 5 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 6 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>>
>>>> == Abstract Syntax Tree ==
>>>> LogicalProject(f1=[$0], f2=[$1])
>>>> LogicalTableScan(table=[[test_source]])
>>>>
>>>> == Optimized Logical Plan ==
>>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>>
>>>> == Physical Execution Plan ==
>>>> Stage 1 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 2 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 3 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 4 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 5 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 6 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 7 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 8 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 9 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 10 : Operator
>>>> content : to: Row
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 11 : Data Source
>>>> content : collect elements with CollectionInputFormat
>>>>
>>>> Stage 12 : Operator
>>>> content : CsvTableSource(read fields: f1, f2)
>>>> ship_strategy : FORWARD
>>>>
>>>> Stage 13 : Operator
>>>> content : Map
>>>> ship_strategy : FORWARD
>>>>
>>>>
>>
Re: Side effect of DataStreamRel#translateToPlan
Posted by Timo Walther <tw...@apache.org>.
Hi Wangsan,
the bahavior of DataStreamRel#translateToPlan is more or less intended.
That's why you call `toAppendStream` on the table environment. Because
you add your pipeline to the environment (from source to current operator).
However, the explain() method should not cause those side-effects.
Regards,
Timo
Am 21.08.18 um 17:29 schrieb wangsan:
> Hi Timo,
>
> I think this may not only affect explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.
>
> I will open an issue late this day, if this is indeed a problem.
>
> Best,
> wangsan
>
>
>
>> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>>
>> Hi,
>>
>> this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?
>>
>> Thanks,
>> Timo
>>
>>
>> Am 21.08.18 um 15:04 schrieb wangsan:
>>> Hi all,
>>>
>>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>>>
>>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>>>
>>> Best, Wangsan
>>>
>>> appendix
>>>
>>> tenv.registerTableSource("test_source", sourceTable)
>>>
>>> val t = tenv.sqlQuery("SELECT * from test_source")
>>> println(tenv.explain(t))
>>> println(tenv.explain(t))
>>>
>>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>>> tenv.toAppendStream(t)
>>> println(tenv.explain(t))
>>> We call explain three times, and the Physical Execution Plan are all diffrent.
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalProject(f1=[$0], f2=[$1])
>>> LogicalTableScan(table=[[test_source]])
>>>
>>> == Optimized Logical Plan ==
>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>
>>> == Physical Execution Plan ==
>>> Stage 1 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 2 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 3 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalProject(f1=[$0], f2=[$1])
>>> LogicalTableScan(table=[[test_source]])
>>>
>>> == Optimized Logical Plan ==
>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>
>>> == Physical Execution Plan ==
>>> Stage 1 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 2 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 3 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>> Stage 4 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 5 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 6 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalProject(f1=[$0], f2=[$1])
>>> LogicalTableScan(table=[[test_source]])
>>>
>>> == Optimized Logical Plan ==
>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>
>>> == Physical Execution Plan ==
>>> Stage 1 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 2 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 3 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>> Stage 4 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 5 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 6 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>> Stage 7 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 8 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 9 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>> Stage 10 : Operator
>>> content : to: Row
>>> ship_strategy : FORWARD
>>>
>>> Stage 11 : Data Source
>>> content : collect elements with CollectionInputFormat
>>>
>>> Stage 12 : Operator
>>> content : CsvTableSource(read fields: f1, f2)
>>> ship_strategy : FORWARD
>>>
>>> Stage 13 : Operator
>>> content : Map
>>> ship_strategy : FORWARD
>>>
>>>
>
Re: Side effect of DataStreamRel#translateToPlan
Posted by wangsan <wa...@163.com>.
Hi Timo,
I think this may not only affect explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.
I will open an issue late this day, if this is indeed a problem.
Best,
wangsan
> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>
> Hi,
>
> this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?
>
> Thanks,
> Timo
>
>
> Am 21.08.18 um 15:04 schrieb wangsan:
>> Hi all,
>>
>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>>
>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>>
>> Best, Wangsan
>>
>> appendix
>>
>> tenv.registerTableSource("test_source", sourceTable)
>>
>> val t = tenv.sqlQuery("SELECT * from test_source")
>> println(tenv.explain(t))
>> println(tenv.explain(t))
>>
>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>> tenv.toAppendStream(t)
>> println(tenv.explain(t))
>> We call explain three times, and the Physical Execution Plan are all diffrent.
>>
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>> LogicalTableScan(table=[[test_source]])
>>
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>>
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>> LogicalTableScan(table=[[test_source]])
>>
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>>
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>> LogicalTableScan(table=[[test_source]])
>>
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>> Stage 7 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 8 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 9 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>> Stage 10 : Operator
>> content : to: Row
>> ship_strategy : FORWARD
>>
>> Stage 11 : Data Source
>> content : collect elements with CollectionInputFormat
>>
>> Stage 12 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>>
>> Stage 13 : Operator
>> content : Map
>> ship_strategy : FORWARD
>>
>>
Re: Side effect of DataStreamRel#translateToPlan
Posted by Timo Walther <tw...@apache.org>.
Hi,
this sounds like a bug to me. Maybe the explain() method is not
implemented correctly. Can you open an issue for it in Jira?
Thanks,
Timo
Am 21.08.18 um 15:04 schrieb wangsan:
> Hi all,
>
> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>
> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>
> Best, Wangsan
>
> appendix
>
> tenv.registerTableSource("test_source", sourceTable)
>
> val t = tenv.sqlQuery("SELECT * from test_source")
> println(tenv.explain(t))
> println(tenv.explain(t))
>
> implicit val typeInfo = TypeInformation.of(classOf[Row])
> tenv.toAppendStream(t)
> println(tenv.explain(t))
> We call explain three times, and the Physical Execution Plan are all diffrent.
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
> LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 2 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 3 : Operator
> content : Map
> ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
> LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 2 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 3 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 4 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 5 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 6 : Operator
> content : Map
> ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
> LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 2 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 3 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 4 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 5 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 6 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 7 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 8 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 9 : Operator
> content : Map
> ship_strategy : FORWARD
>
> Stage 10 : Operator
> content : to: Row
> ship_strategy : FORWARD
>
> Stage 11 : Data Source
> content : collect elements with CollectionInputFormat
>
> Stage 12 : Operator
> content : CsvTableSource(read fields: f1, f2)
> ship_strategy : FORWARD
>
> Stage 13 : Operator
> content : Map
> ship_strategy : FORWARD
>
>