You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wangsan <wa...@163.com> on 2018/08/21 13:04:55 UTC

Side effect of DataStreamRel#translateToPlan

Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

    tenv.registerTableSource("test_source", sourceTable)

    val t = tenv.sqlQuery("SELECT * from test_source")
    println(tenv.explain(t))
    println(tenv.explain(t))

    implicit val typeInfo = TypeInformation.of(classOf[Row])
    tenv.toAppendStream(t)
    println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 4 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 6 : Operator
            content : Map
            ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 4 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 6 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 7 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 8 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 9 : Operator
            content : Map
            ship_strategy : FORWARD

            Stage 10 : Operator
                content : to: Row
                ship_strategy : FORWARD

Stage 11 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 12 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 13 : Operator
            content : Map
            ship_strategy : FORWARD


Re: Side effect of DataStreamRel#translateToPlan

Posted by Hequn Cheng <ch...@gmail.com>.
Hi wangsan,

> why don’t we make the DataStreamRel#translateToPlan always return the
same DataSteam as it is called the first time. What do you think?
You are definitely right. It would be nice if we support multi sink.
The reason for this problem is that flink now use calcite to parse and
optimize the sql while calcite don't support multi sink, so each time
insertInto be called, the whole sql is treated as a new one. I think the
community is already trying to solve the problem and probably be solved
soon.

Best, Hequn

On Wed, Aug 22, 2018 at 4:15 PM wangsan <wa...@163.com> wrote:

> Hi Timo,
>
> What confused me is why we need to rebuild the pipeline each time we try
> to convert the RelNode into datastream.
>
> In case I have this code (I want to write the query result into two target
> sinks) :
>
> val t = tenv.sqlQuery("SELECT * from test_source")
> t.insertInto("sink_1")
> t.insertInto("sink_2")
>
> And I got execution plan link this,
> So we have two source operators, and they are just the same. But in some
> cases, when the backend connector does not support multiple consumers (eg,
> MQ that only deliver message once), one of the two target sink may not
> receive all the records (and that's the problem I meet :( ).
>
> I thought the behavior of the above code should be equivalent to this,
>
> val t = tenv.sqlQuery("SELECT * from test_source")
> val stream = tenv.toAppendStream[Row](t)
> stream.writeAsText("path_1")
> stream.writeAsText("path_2”)
>
>
> So, IMO, Instead of return a totally new one, why don’t we make the
> DataStreamRel#translateToPlan always return the same DataSteam as it is
> called the first time. What do you think?
>
> Best,
> wangsan
>
>
>
> On Aug 22, 2018, at 12:03 AM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Wangsan,
>
> the bahavior of DataStreamRel#translateToPlan is more or less intended.
> That's why you call `toAppendStream` on the table environment. Because you
> add your pipeline to the environment (from source to current operator).
>
> However, the explain() method should not cause those side-effects.
>
> Regards,
> Timo
>
> Am 21.08.18 um 17:29 schrieb wangsan:
>
> Hi Timo,
>
> I think this may not only affect  explain() method. Method
> DataStreamRel#translateToPlan is called when we need translate a
> FlinkRelNode into DataStream or DataSet, we add desired operators in
> execution environment. By side effect, I mean that if we call
> DataStreamRel#translateToPlan on same RelNode  several times, the same
> operators are added in execution environment more than once, but actually
> we need that for only one time. Correct me if I misunderstood that.
>
> I will open an issue late this day, if this is indeed a problem.
>
> Best,
> wangsan
>
>
>
> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>
> Hi,
>
> this sounds like a bug to me. Maybe the explain() method is not
> implemented correctly. Can you open an issue for it in Jira?
>
> Thanks,
> Timo
>
>
> Am 21.08.18 um 15:04 schrieb wangsan:
>
> Hi all,
>
> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and
> that may cause the execution plan not as what we expected. Every time we
> call DataStreamRel#translateToPlan (in TableEnvirnment#explain,
> TableEnvirnment#writeToSink, etc), we add same operators in execution
> environment repeatedly.
>
> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>
> Best,  Wangsan
>
> appendix
>
>     tenv.registerTableSource("test_source", sourceTable)
>
>     val t = tenv.sqlQuery("SELECT * from test_source")
>     println(tenv.explain(t))
>     println(tenv.explain(t))
>
>     implicit val typeInfo = TypeInformation.of(classOf[Row])
>     tenv.toAppendStream(t)
>     println(tenv.explain(t))
> We call explain three times, and the Physical Execution Plan are all
> diffrent.
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
>   LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
> source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 2 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 3 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
>   LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
> source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 2 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 3 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
> Stage 4 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 5 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 6 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
>   LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
> source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 2 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 3 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
> Stage 4 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 5 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 6 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
> Stage 7 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 8 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 9 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
>             Stage 10 : Operator
>                 content : to: Row
>                 ship_strategy : FORWARD
>
> Stage 11 : Data Source
>     content : collect elements with CollectionInputFormat
>
>     Stage 12 : Operator
>         content : CsvTableSource(read fields: f1, f2)
>         ship_strategy : FORWARD
>
>         Stage 13 : Operator
>             content : Map
>             ship_strategy : FORWARD
>
>
>
>
>

Re: Side effect of DataStreamRel#translateToPlan

Posted by wangsan <wa...@163.com>.
Hi Timo,

What confused me is why we need to rebuild the pipeline each time we try to convert the RelNode into datastream.

In case I have this code (I want to write the query result into two target sinks) :

val t = tenv.sqlQuery("SELECT * from test_source")
t.insertInto("sink_1")
t.insertInto("sink_2")

And I got execution plan link this,

So we have two source operators, and they are just the same. But in some cases, when the backend connector does not support multiple consumers (eg, MQ that only deliver message once), one of the two target sink may not receive all the records (and that's the problem I meet :( ).

I thought the behavior of the above code should be equivalent to this,

val t = tenv.sqlQuery("SELECT * from test_source")
val stream = tenv.toAppendStream[Row](t)
stream.writeAsText("path_1")
stream.writeAsText("path_2”)



So, IMO, Instead of return a totally new one, why don’t we make the DataStreamRel#translateToPlan always return the same DataSteam as it is called the first time. What do you think?

Best,
wangsan



> On Aug 22, 2018, at 12:03 AM, Timo Walther <tw...@apache.org> wrote:
> 
> Hi Wangsan,
> 
> the bahavior of DataStreamRel#translateToPlan is more or less intended. That's why you call `toAppendStream` on the table environment. Because you add your pipeline to the environment (from source to current operator).
> 
> However, the explain() method should not cause those side-effects.
> 
> Regards,
> Timo
> 
> Am 21.08.18 um 17:29 schrieb wangsan:
>> Hi Timo,
>> 
>> I think this may not only affect  explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode  several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.
>> 
>> I will open an issue late this day, if this is indeed a problem.
>> 
>> Best,
>> wangsan
>> 
>> 
>> 
>>> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>>> 
>>> Hi,
>>> 
>>> this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?
>>> 
>>> Thanks,
>>> Timo
>>> 
>>> 
>>> Am 21.08.18 um 15:04 schrieb wangsan:
>>>> Hi all,
>>>> 
>>>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>>>> 
>>>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>>>> 
>>>> Best,  Wangsan
>>>> 
>>>> appendix
>>>> 
>>>>     tenv.registerTableSource("test_source", sourceTable)
>>>> 
>>>>     val t = tenv.sqlQuery("SELECT * from test_source")
>>>>     println(tenv.explain(t))
>>>>     println(tenv.explain(t))
>>>> 
>>>>     implicit val typeInfo = TypeInformation.of(classOf[Row])
>>>>     tenv.toAppendStream(t)
>>>>     println(tenv.explain(t))
>>>> We call explain three times, and the Physical Execution Plan are all diffrent.
>>>> 
>>>> == Abstract Syntax Tree ==
>>>> LogicalProject(f1=[$0], f2=[$1])
>>>>   LogicalTableScan(table=[[test_source]])
>>>> 
>>>> == Optimized Logical Plan ==
>>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>> 
>>>> == Physical Execution Plan ==
>>>> Stage 1 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 2 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 3 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>> 
>>>> == Abstract Syntax Tree ==
>>>> LogicalProject(f1=[$0], f2=[$1])
>>>>   LogicalTableScan(table=[[test_source]])
>>>> 
>>>> == Optimized Logical Plan ==
>>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>> 
>>>> == Physical Execution Plan ==
>>>> Stage 1 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 2 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 3 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>> Stage 4 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 5 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 6 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>> 
>>>> == Abstract Syntax Tree ==
>>>> LogicalProject(f1=[$0], f2=[$1])
>>>>   LogicalTableScan(table=[[test_source]])
>>>> 
>>>> == Optimized Logical Plan ==
>>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>> 
>>>> == Physical Execution Plan ==
>>>> Stage 1 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 2 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 3 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>> Stage 4 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 5 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 6 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>> Stage 7 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 8 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 9 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>>             Stage 10 : Operator
>>>>                 content : to: Row
>>>>                 ship_strategy : FORWARD
>>>> 
>>>> Stage 11 : Data Source
>>>>     content : collect elements with CollectionInputFormat
>>>> 
>>>>     Stage 12 : Operator
>>>>         content : CsvTableSource(read fields: f1, f2)
>>>>         ship_strategy : FORWARD
>>>> 
>>>>         Stage 13 : Operator
>>>>             content : Map
>>>>             ship_strategy : FORWARD
>>>> 
>>>> 
>> 


Re: Side effect of DataStreamRel#translateToPlan

Posted by Timo Walther <tw...@apache.org>.
Hi Wangsan,

the bahavior of DataStreamRel#translateToPlan is more or less intended. 
That's why you call `toAppendStream` on the table environment. Because 
you add your pipeline to the environment (from source to current operator).

However, the explain() method should not cause those side-effects.

Regards,
Timo

Am 21.08.18 um 17:29 schrieb wangsan:
> Hi Timo,
>
> I think this may not only affect  explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode  several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.
>
> I will open an issue late this day, if this is indeed a problem.
>
> Best,
> wangsan
>
>
>
>> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
>>
>> Hi,
>>
>> this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?
>>
>> Thanks,
>> Timo
>>
>>
>> Am 21.08.18 um 15:04 schrieb wangsan:
>>> Hi all,
>>>
>>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>>>
>>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>>>
>>> Best,  Wangsan
>>>
>>> appendix
>>>
>>>      tenv.registerTableSource("test_source", sourceTable)
>>>
>>>      val t = tenv.sqlQuery("SELECT * from test_source")
>>>      println(tenv.explain(t))
>>>      println(tenv.explain(t))
>>>
>>>      implicit val typeInfo = TypeInformation.of(classOf[Row])
>>>      tenv.toAppendStream(t)
>>>      println(tenv.explain(t))
>>> We call explain three times, and the Physical Execution Plan are all diffrent.
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalProject(f1=[$0], f2=[$1])
>>>    LogicalTableScan(table=[[test_source]])
>>>
>>> == Optimized Logical Plan ==
>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>
>>> == Physical Execution Plan ==
>>> Stage 1 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 2 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 3 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalProject(f1=[$0], f2=[$1])
>>>    LogicalTableScan(table=[[test_source]])
>>>
>>> == Optimized Logical Plan ==
>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>
>>> == Physical Execution Plan ==
>>> Stage 1 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 2 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 3 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>> Stage 4 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 5 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 6 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalProject(f1=[$0], f2=[$1])
>>>    LogicalTableScan(table=[[test_source]])
>>>
>>> == Optimized Logical Plan ==
>>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>>>
>>> == Physical Execution Plan ==
>>> Stage 1 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 2 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 3 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>> Stage 4 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 5 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 6 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>> Stage 7 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 8 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 9 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>>              Stage 10 : Operator
>>>                  content : to: Row
>>>                  ship_strategy : FORWARD
>>>
>>> Stage 11 : Data Source
>>>      content : collect elements with CollectionInputFormat
>>>
>>>      Stage 12 : Operator
>>>          content : CsvTableSource(read fields: f1, f2)
>>>          ship_strategy : FORWARD
>>>
>>>          Stage 13 : Operator
>>>              content : Map
>>>              ship_strategy : FORWARD
>>>
>>>
>


Re: Side effect of DataStreamRel#translateToPlan

Posted by wangsan <wa...@163.com>.
Hi Timo, 

I think this may not only affect  explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode  several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



> On Aug 21, 2018, at 10:16 PM, Timo Walther <tw...@apache.org> wrote:
> 
> Hi,
> 
> this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?
> 
> Thanks,
> Timo
> 
> 
> Am 21.08.18 um 15:04 schrieb wangsan:
>> Hi all,
>> 
>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>> 
>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>> 
>> Best,  Wangsan
>> 
>> appendix
>> 
>>     tenv.registerTableSource("test_source", sourceTable)
>> 
>>     val t = tenv.sqlQuery("SELECT * from test_source")
>>     println(tenv.explain(t))
>>     println(tenv.explain(t))
>> 
>>     implicit val typeInfo = TypeInformation.of(classOf[Row])
>>     tenv.toAppendStream(t)
>>     println(tenv.explain(t))
>> We call explain three times, and the Physical Execution Plan are all diffrent.
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 2 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 3 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 2 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 3 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 5 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 6 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 2 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 3 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 5 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 6 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>> Stage 7 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 8 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 9 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>>             Stage 10 : Operator
>>                 content : to: Row
>>                 ship_strategy : FORWARD
>> 
>> Stage 11 : Data Source
>>     content : collect elements with CollectionInputFormat
>> 
>>     Stage 12 : Operator
>>         content : CsvTableSource(read fields: f1, f2)
>>         ship_strategy : FORWARD
>> 
>>         Stage 13 : Operator
>>             content : Map
>>             ship_strategy : FORWARD
>> 
>> 


Re: Side effect of DataStreamRel#translateToPlan

Posted by Timo Walther <tw...@apache.org>.
Hi,

this sounds like a bug to me. Maybe the explain() method is not 
implemented correctly. Can you open an issue for it in Jira?

Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:
> Hi all,
>
> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
>
> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>
> Best,  Wangsan
>
> appendix
>
>      tenv.registerTableSource("test_source", sourceTable)
>
>      val t = tenv.sqlQuery("SELECT * from test_source")
>      println(tenv.explain(t))
>      println(tenv.explain(t))
>
>      implicit val typeInfo = TypeInformation.of(classOf[Row])
>      tenv.toAppendStream(t)
>      println(tenv.explain(t))
> We call explain three times, and the Physical Execution Plan are all diffrent.
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
>    LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 2 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 3 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
>    LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 2 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 3 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
> Stage 4 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 5 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 6 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
>
> == Abstract Syntax Tree ==
> LogicalProject(f1=[$0], f2=[$1])
>    LogicalTableScan(table=[[test_source]])
>
> == Optimized Logical Plan ==
> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 2 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 3 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
> Stage 4 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 5 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 6 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
> Stage 7 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 8 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 9 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
>              Stage 10 : Operator
>                  content : to: Row
>                  ship_strategy : FORWARD
>
> Stage 11 : Data Source
>      content : collect elements with CollectionInputFormat
>
>      Stage 12 : Operator
>          content : CsvTableSource(read fields: f1, f2)
>          ship_strategy : FORWARD
>
>          Stage 13 : Operator
>              content : Map
>              ship_strategy : FORWARD
>
>