You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by simone <si...@gmail.com> on 2018/03/16 14:47:41 UTC

Strange behavior on filter, group and reduce DataSets

Hi all,
I am using Flink 1.3.1 and I have found a strange behavior on running 
the following logic:

 1. Read data from file and store into DataSet<POJO>
 2. Split dataset in two, by checking if "field1" of POJOs is empty or
    not, so that the first dataset contains only elements with non empty
    "field1", and the second dataset will contain the other elements.
 3. Each dataset is then grouped by, one by "field1" and other by
    another field, and subsequently reduced.
 4. The 2 datasets are merged together by union.
 5. The final dataset is written as json.

What I was expected, from output, was to find only one element with a 
specific value of "field1" because:

 1. Reducing the first dataset grouped by "field1" should generate only
    one element with a specific value of "field1".
 2. The second dataset should contain only elements with empty "field1".
 3. Making an union of them should not duplicate any record.

This does not happen. When i read the generated jsons i see some 
duplicate (non empty) values of "field1".
Strangely this does not happen when the union between the two datasets 
is not computed. In this case the first dataset produces elements only 
with distinct values of "field1", while second dataset produces only 
records with empty field "value1".

Debugging the code, it seems that the map function used to convent the 
last merged dataset into json strings starts before the reduction 
functions terminates. This seems to produce duplicates.
Here is my pseudocode:

/DataSet<POJO> subjects = read from csv...//
//
//DataSet<POJO> subjectsWithCondition = subjects.filter(new 
FilterFunction<POJO>()//{//
//    @Override//
//    public boolean filter(POJO subject) throws Exception {//
//        return subject.getField("field1") != "";//
//    }//
//}).groupBy("field1").reduce(new ReduceFunction<P>() {//
//    @Override//
//    public Soggetto reduce(POJO subject1, POJO subject2) {//
//        POJO ret = subject1;//
//        return ret;//
//    }//
//});//
//
//DataSet<POJO> subjectsWithoutCondition = subjects.filter(new 
FilterFunction<POJO>(){//
//    @Override//
//    public boolean filter(POJO subject) throws Exception {//
//        return subject.getField("field1") == "";//
//    }//
//}).groupBy("field2").reduce(new ReduceFunction<P>() {//
//    @Override//
//    public Soggetto reduce(POJO subject1, POJO subject2) {//
//        POJO ret = subject1;//
//        return ret;//
//    }//
//});//
//
//DataSet<POJO> allSubjects = 
subjectsWithCondition.union(subjectsWithoutCondition);//
//
//DataSet<String> jsonSubjects = allSubjects.map(new 
RichMapFunction<POJO, String>() {//
//    private static final long serialVersionUID = 1L;//
//    ObjectMapper mapper = new ObjectMapper();//
//
//    @Override//
//    public String map(POJO subject) throws Exception {//
//        return mapper.writeValueAsString(subject);//
//    }//
//});//
//
//jsonSubjects.writeAsText("/tmp/subjects/", WriteMode.OVERWRITE);//
//env.execute("JSON generation");/

What is the problem? Did I made some mistake on filtering,grouping or 
reducing logic?
Thanks in advance,
Simone.

Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,
Yes, I've updated the PR.
It needs a review and should be included in Flink 1.5.

Cheers, Fabian

simone <si...@gmail.com> schrieb am Mo., 26. März 2018, 12:01:

> Hi Fabian,
>
> any update on this? Did you fix it?
>
> Best, Simone.
>
> On 22/03/2018 00:24, Fabian Hueske wrote:
>
> Hi,
>
> That was a bit too early.
> I found an issue with my approach. Will come back once I solved that.
>
> Best, Fabian
>
> 2018-03-21 23:45 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi,
>>
>> I've opened a pull request [1] that should fix the problem.
>> It would be great if you could try change and report back whether it
>> fixes the problem.
>>
>> Thank you,
>> Fabian
>>
>> [1] https://github.com/apache/flink/pull/5742
>>
>> 2018-03-21 9:49 GMT+01:00 simone <si...@gmail.com>:
>>
>>> Hi all,
>>>
>>> an update: following Stephan directives on how to diagnose the issue,
>>> making Person immutable, the problem does not occur.
>>>
>>> Simone.
>>>
>>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>>
>>> To diagnose that, can you please check the following:
>>>
>>>   - Change the Person data type to be immutable (final fields, no
>>> setters, set fields in constructor instead). Does that make the problem go
>>> away?
>>>
>>>   - Change the Person data type to not be a POJO by adding a dummy
>>> fields that is never used, but does not have a getter/setter. Does that
>>> make the problem go away?
>>>
>>> If either of that is the case, it must be a mutability bug somewhere in
>>> either accidental object reuse or accidental serializer sharing.
>>>
>>>
>>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Simone and Flavio,
>>>>
>>>> I created FLINK-9031 [1] for this issue.
>>>> Please have a look and add any detail that you think could help to
>>>> resolve the problem.
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>>>
>>>> 2018-03-19 16:35 GMT+01:00 simone <si...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> This simple code reproduces the behavior ->
>>>>> https://github.com/xseris/Flink-test-union
>>>>>
>>>>> Thanks, Simone.
>>>>>
>>>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>>>
>>>>> Hmmm, I still don't see the problem.
>>>>> IMO, the result should be correct for both plans. The data is
>>>>> replicated, filtered, reduced, and unioned.
>>>>> There is nothing in between the filter and reduce, that could cause
>>>>> incorrect behavior.
>>>>>
>>>>> The good thing is, the optimizer seems to be fine. The bad thing is,
>>>>> it is either the Flink runtime code or your functions.
>>>>> Given that one plan produces good results, it might be the Flink
>>>>> runtime code.
>>>>>
>>>>> Coming back to my previous question.
>>>>> Can you provide a minimal program to reproduce the issue?
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>>>
>>>>>> Ah, thanks for the update!
>>>>>> I'll have a look at that.
>>>>>>
>>>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>>>>
>>>>>>> HI Simone,
>>>>>>>
>>>>>>> Looking at the plan, I don't see why this should be happening. The
>>>>>>> pseudo code looks fine as well.
>>>>>>> Any chance that you can create a minimal program to reproduce the
>>>>>>> problem?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Fabian,
>>>>>>>>
>>>>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Simone
>>>>>>>>
>>>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Union is actually a very simple operator (not even an operator in
>>>>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>>>>> involved.
>>>>>>>> Therefore, it should also not emit records before either of both
>>>>>>>> ReduceFunctions sorted its data.
>>>>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>>>>>>> So it is not unexpected that Map starts processing before the
>>>>>>>> ReduceFunction terminated.
>>>>>>>>
>>>>>>>> Did you enable object reuse [1]?
>>>>>>>> If yes, try to disable it. If you want to reuse objects, you have
>>>>>>>> to be careful in how you implement your functions.
>>>>>>>> If no, can you share the plan
>>>>>>>> (ExecutionEnvironment.getExecutionPlan()) that was generated for the
>>>>>>>> program?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>>> :
>>>>>>>>
>>>>>>>>> Any help on this? This thing is very strange..the "manual" union
>>>>>>>>> of the output of the 2 datasets is different than the flink-union of them..
>>>>>>>>> Could it be a problem of the flink optimizer?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <
>>>>>>>>> simone.povoscania@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>>>>> indeed an equals.
>>>>>>>>>>
>>>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>>>>> method, not == operator.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Kien
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>>>>
>>>>>>>>>> *subject.getField("field1") == "";*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by simone <si...@gmail.com>.
Hi Fabian,

any update on this? Did you fix it?

Best, Simone.


On 22/03/2018 00:24, Fabian Hueske wrote:
> Hi,
>
> That was a bit too early.
> I found an issue with my approach. Will come back once I solved that.
>
> Best, Fabian
>
> 2018-03-21 23:45 GMT+01:00 Fabian Hueske <fhueske@gmail.com 
> <ma...@gmail.com>>:
>
>     Hi,
>
>     I've opened a pull request [1] that should fix the problem.
>     It would be great if you could try change and report back whether
>     it fixes the problem.
>
>     Thank you,
>     Fabian
>
>     [1] https://github.com/apache/flink/pull/5742
>     <https://github.com/apache/flink/pull/5742>
>
>     2018-03-21 9:49 GMT+01:00 simone <simone.povoscania@gmail.com
>     <ma...@gmail.com>>:
>
>         Hi all,
>
>         an update: following Stephan directives on how to diagnose the
>         issue, making Person immutable, the problem does not occur.
>
>         Simone.
>
>
>         On 20/03/2018 20:20, Stephan Ewen wrote:
>>         To diagnose that, can you please check the following:
>>
>>           - Change the Person data type to be immutable (final
>>         fields, no setters, set fields in constructor instead). Does
>>         that make the problem go away?
>>
>>           - Change the Person data type to not be a POJO by adding a
>>         dummy fields that is never used, but does not have a
>>         getter/setter. Does that make the problem go away?
>>
>>         If either of that is the case, it must be a mutability bug
>>         somewhere in either accidental object reuse or accidental
>>         serializer sharing.
>>
>>
>>         On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske
>>         <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>
>>             Hi Simone and Flavio,
>>
>>             I created FLINK-9031 [1] for this issue.
>>             Please have a look and add any detail that you think
>>             could help to resolve the problem.
>>
>>             Thanks,
>>             Fabian
>>
>>             [1] https://issues.apache.org/jira/browse/FLINK-9031
>>             <https://issues.apache.org/jira/browse/FLINK-9031>
>>
>>             2018-03-19 16:35 GMT+01:00 simone
>>             <simone.povoscania@gmail.com
>>             <ma...@gmail.com>>:
>>
>>                 Hi Fabian,
>>
>>                 This simple code reproduces the behavior ->
>>                 https://github.com/xseris/Flink-test-union
>>                 <https://github.com/xseris/Flink-test-union>
>>
>>                 Thanks, Simone.
>>
>>
>>                 On 19/03/2018 15:44, Fabian Hueske wrote:
>>>                 Hmmm, I still don't see the problem.
>>>                 IMO, the result should be correct for both plans.
>>>                 The data is replicated, filtered, reduced, and unioned.
>>>                 There is nothing in between the filter and reduce,
>>>                 that could cause incorrect behavior.
>>>
>>>                 The good thing is, the optimizer seems to be fine.
>>>                 The bad thing is, it is either the Flink runtime
>>>                 code or your functions.
>>>                 Given that one plan produces good results, it might
>>>                 be the Flink runtime code.
>>>
>>>                 Coming back to my previous question.
>>>                 Can you provide a minimal program to reproduce the
>>>                 issue?
>>>
>>>                 Thanks, Fabian
>>>
>>>                 2018-03-19 15:15 GMT+01:00 Fabian Hueske
>>>                 <fhueske@gmail.com <ma...@gmail.com>>:
>>>
>>>                     Ah, thanks for the update!
>>>                     I'll have a look at that.
>>>
>>>                     2018-03-19 15:13 GMT+01:00 Fabian Hueske
>>>                     <fhueske@gmail.com <ma...@gmail.com>>:
>>>
>>>                         HI Simone,
>>>
>>>                         Looking at the plan, I don't see why this
>>>                         should be happening. The pseudo code looks
>>>                         fine as well.
>>>                         Any chance that you can create a minimal
>>>                         program to reproduce the problem?
>>>
>>>                         Thanks,
>>>                         Fabian
>>>
>>>                         2018-03-19 12:04 GMT+01:00 simone
>>>                         <simone.povoscania@gmail.com
>>>                         <ma...@gmail.com>>:
>>>
>>>                             Hi Fabian,
>>>
>>>                             reuse is not enabled. I attach the plan
>>>                             of the execution.
>>>
>>>                             Thanks,
>>>                             Simone
>>>
>>>
>>>                             On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>                             Hi,
>>>>
>>>>                             Union is actually a very simple
>>>>                             operator (not even an operator in Flink
>>>>                             terms). It just merges to inputs. There
>>>>                             is no additional logic involved.
>>>>                             Therefore, it should also not emit
>>>>                             records before either of both
>>>>                             ReduceFunctions sorted its data.
>>>>                             Once the data has been sorted for the
>>>>                             ReduceFunction, the data is reduced and
>>>>                             emitted in a pipelined fashion, i.e.,
>>>>                             once the first record is reduced, it is
>>>>                             forwarded into the MapFunction (passing
>>>>                             the unioned inputs).
>>>>                             So it is not unexpected that Map starts
>>>>                             processing before the ReduceFunction
>>>>                             terminated.
>>>>
>>>>                             Did you enable object reuse [1]?
>>>>                             If yes, try to disable it. If you want
>>>>                             to reuse objects, you have to be
>>>>                             careful in how you implement your
>>>>                             functions.
>>>>                             If no, can you share the plan
>>>>                             (ExecutionEnvironment.getExecutionPlan())
>>>>                             that was generated for the program?
>>>>
>>>>                             Thanks,
>>>>                             Fabian
>>>>
>>>>                             [1]
>>>>                             https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
>>>>                             <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>
>>>>
>>>>
>>>>
>>>>                             2018-03-19 9:51 GMT+01:00 Flavio
>>>>                             Pompermaier <pompermaier@okkam.it
>>>>                             <ma...@okkam.it>>:
>>>>
>>>>                                 Any help on this? This thing is
>>>>                                 very strange..the "manual" union of
>>>>                                 the output of the 2 datasets is
>>>>                                 different than the flink-union of
>>>>                                 them..
>>>>                                 Could it be a problem of the flink
>>>>                                 optimizer?
>>>>
>>>>                                 Best,
>>>>                                 Flavio
>>>>
>>>>                                 On Fri, Mar 16, 2018 at 4:01 PM,
>>>>                                 simone <simone.povoscania@gmail.com
>>>>                                 <ma...@gmail.com>>
>>>>                                 wrote:
>>>>
>>>>                                     Sorry, I translated the code
>>>>                                     into pseudocode too fast. That
>>>>                                     is indeed an equals.
>>>>
>>>>
>>>>                                     On 16/03/2018 15:58, Kien
>>>>                                     Truong wrote:
>>>>>
>>>>>                                     Hi,
>>>>>
>>>>>                                     Just a guest, but string
>>>>>                                     compare in Java should be
>>>>>                                     using equals method, not ==
>>>>>                                     operator.
>>>>>
>>>>>                                     Regards,
>>>>>
>>>>>                                     Kien
>>>>>
>>>>>
>>>>>                                     On 3/16/2018 9:47 PM, simone
>>>>>                                     wrote:
>>>>>>                                     /subject.getField("field1")
>>>>>>                                     == "";//
>>>>>>                                     /
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>
>
>


Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> Hi,
>
> I've opened a pull request [1] that should fix the problem.
> It would be great if you could try change and report back whether it fixes
> the problem.
>
> Thank you,
> Fabian
>
> [1] https://github.com/apache/flink/pull/5742
>
> 2018-03-21 9:49 GMT+01:00 simone <si...@gmail.com>:
>
>> Hi all,
>>
>> an update: following Stephan directives on how to diagnose the issue,
>> making Person immutable, the problem does not occur.
>>
>> Simone.
>>
>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>
>> To diagnose that, can you please check the following:
>>
>>   - Change the Person data type to be immutable (final fields, no
>> setters, set fields in constructor instead). Does that make the problem go
>> away?
>>
>>   - Change the Person data type to not be a POJO by adding a dummy fields
>> that is never used, but does not have a getter/setter. Does that make
>> the problem go away?
>>
>> If either of that is the case, it must be a mutability bug somewhere in
>> either accidental object reuse or accidental serializer sharing.
>>
>>
>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Simone and Flavio,
>>>
>>> I created FLINK-9031 [1] for this issue.
>>> Please have a look and add any detail that you think could help to
>>> resolve the problem.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>>
>>> 2018-03-19 16:35 GMT+01:00 simone <si...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> This simple code reproduces the behavior ->
>>>> https://github.com/xseris/Flink-test-union
>>>>
>>>> Thanks, Simone.
>>>>
>>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>>
>>>> Hmmm, I still don't see the problem.
>>>> IMO, the result should be correct for both plans. The data is
>>>> replicated, filtered, reduced, and unioned.
>>>> There is nothing in between the filter and reduce, that could cause
>>>> incorrect behavior.
>>>>
>>>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>>>> is either the Flink runtime code or your functions.
>>>> Given that one plan produces good results, it might be the Flink
>>>> runtime code.
>>>>
>>>> Coming back to my previous question.
>>>> Can you provide a minimal program to reproduce the issue?
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>>
>>>>> Ah, thanks for the update!
>>>>> I'll have a look at that.
>>>>>
>>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>>>
>>>>>> HI Simone,
>>>>>>
>>>>>> Looking at the plan, I don't see why this should be happening. The
>>>>>> pseudo code looks fine as well.
>>>>>> Any chance that you can create a minimal program to reproduce the
>>>>>> problem?
>>>>>>
>>>>>> Thanks,
>>>>>> Fabian
>>>>>>
>>>>>> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simone
>>>>>>>
>>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Union is actually a very simple operator (not even an operator in
>>>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>>>> involved.
>>>>>>> Therefore, it should also not emit records before either of both
>>>>>>> ReduceFunctions sorted its data.
>>>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>>>>>> So it is not unexpected that Map starts processing before the
>>>>>>> ReduceFunction terminated.
>>>>>>>
>>>>>>> Did you enable object reuse [1]?
>>>>>>> If yes, try to disable it. If you want to reuse objects, you have to
>>>>>>> be careful in how you implement your functions.
>>>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>>>>> that was generated for the program?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Fabian
>>>>>>>
>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>>
>>>>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>>>>> Could it be a problem of the flink optimizer?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <
>>>>>>>> simone.povoscania@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>>>> indeed an equals.
>>>>>>>>>
>>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>>>> method, not == operator.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Kien
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>>>
>>>>>>>>> *subject.getField("field1") == "";*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes
the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone <si...@gmail.com>:

> Hi all,
>
> an update: following Stephan directives on how to diagnose the issue,
> making Person immutable, the problem does not occur.
>
> Simone.
>
> On 20/03/2018 20:20, Stephan Ewen wrote:
>
> To diagnose that, can you please check the following:
>
>   - Change the Person data type to be immutable (final fields, no setters,
> set fields in constructor instead). Does that make the problem go away?
>
>   - Change the Person data type to not be a POJO by adding a dummy fields
> that is never used, but does not have a getter/setter. Does that make the
> problem go away?
>
> If either of that is the case, it must be a mutability bug somewhere in
> either accidental object reuse or accidental serializer sharing.
>
>
> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Simone and Flavio,
>>
>> I created FLINK-9031 [1] for this issue.
>> Please have a look and add any detail that you think could help to
>> resolve the problem.
>>
>> Thanks,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>
>> 2018-03-19 16:35 GMT+01:00 simone <si...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> This simple code reproduces the behavior ->
>>> https://github.com/xseris/Flink-test-union
>>>
>>> Thanks, Simone.
>>>
>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>
>>> Hmmm, I still don't see the problem.
>>> IMO, the result should be correct for both plans. The data is
>>> replicated, filtered, reduced, and unioned.
>>> There is nothing in between the filter and reduce, that could cause
>>> incorrect behavior.
>>>
>>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>>> is either the Flink runtime code or your functions.
>>> Given that one plan produces good results, it might be the Flink runtime
>>> code.
>>>
>>> Coming back to my previous question.
>>> Can you provide a minimal program to reproduce the issue?
>>>
>>> Thanks, Fabian
>>>
>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>
>>>> Ah, thanks for the update!
>>>> I'll have a look at that.
>>>>
>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>>
>>>>> HI Simone,
>>>>>
>>>>> Looking at the plan, I don't see why this should be happening. The
>>>>> pseudo code looks fine as well.
>>>>> Any chance that you can create a minimal program to reproduce the
>>>>> problem?
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>>
>>>>>> Thanks,
>>>>>> Simone
>>>>>>
>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Union is actually a very simple operator (not even an operator in
>>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>>> involved.
>>>>>> Therefore, it should also not emit records before either of both
>>>>>> ReduceFunctions sorted its data.
>>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>>>>> So it is not unexpected that Map starts processing before the
>>>>>> ReduceFunction terminated.
>>>>>>
>>>>>> Did you enable object reuse [1]?
>>>>>> If yes, try to disable it. If you want to reuse objects, you have to
>>>>>> be careful in how you implement your functions.
>>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>>>> that was generated for the program?
>>>>>>
>>>>>> Thanks,
>>>>>> Fabian
>>>>>>
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>>>> Could it be a problem of the flink optimizer?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <simone.povoscania@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>>> indeed an equals.
>>>>>>>>
>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>>> method, not == operator.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Kien
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>>
>>>>>>>> *subject.getField("field1") == "";*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by simone <si...@gmail.com>.
Hi all,

an update: following Stephan directives on how to diagnose the issue, 
making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
> To diagnose that, can you please check the following:
>
>   - Change the Person data type to be immutable (final fields, no 
> setters, set fields in constructor instead). Does that make the 
> problem go away?
>
>   - Change the Person data type to not be a POJO by adding a dummy 
> fields that is never used, but does not have a getter/setter. Does 
> that make the problem go away?
>
> If either of that is the case, it must be a mutability bug somewhere 
> in either accidental object reuse or accidental serializer sharing.
>
>
> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhueske@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Simone and Flavio,
>
>     I created FLINK-9031 [1] for this issue.
>     Please have a look and add any detail that you think could help to
>     resolve the problem.
>
>     Thanks,
>     Fabian
>
>     [1] https://issues.apache.org/jira/browse/FLINK-9031
>     <https://issues.apache.org/jira/browse/FLINK-9031>
>
>     2018-03-19 16:35 GMT+01:00 simone <simone.povoscania@gmail.com
>     <ma...@gmail.com>>:
>
>         Hi Fabian,
>
>         This simple code reproduces the behavior ->
>         https://github.com/xseris/Flink-test-union
>         <https://github.com/xseris/Flink-test-union>
>
>         Thanks, Simone.
>
>
>         On 19/03/2018 15:44, Fabian Hueske wrote:
>>         Hmmm, I still don't see the problem.
>>         IMO, the result should be correct for both plans. The data is
>>         replicated, filtered, reduced, and unioned.
>>         There is nothing in between the filter and reduce, that could
>>         cause incorrect behavior.
>>
>>         The good thing is, the optimizer seems to be fine. The bad
>>         thing is, it is either the Flink runtime code or your functions.
>>         Given that one plan produces good results, it might be the
>>         Flink runtime code.
>>
>>         Coming back to my previous question.
>>         Can you provide a minimal program to reproduce the issue?
>>
>>         Thanks, Fabian
>>
>>         2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhueske@gmail.com
>>         <ma...@gmail.com>>:
>>
>>             Ah, thanks for the update!
>>             I'll have a look at that.
>>
>>             2018-03-19 15:13 GMT+01:00 Fabian Hueske
>>             <fhueske@gmail.com <ma...@gmail.com>>:
>>
>>                 HI Simone,
>>
>>                 Looking at the plan, I don't see why this should be
>>                 happening. The pseudo code looks fine as well.
>>                 Any chance that you can create a minimal program to
>>                 reproduce the problem?
>>
>>                 Thanks,
>>                 Fabian
>>
>>                 2018-03-19 12:04 GMT+01:00 simone
>>                 <simone.povoscania@gmail.com
>>                 <ma...@gmail.com>>:
>>
>>                     Hi Fabian,
>>
>>                     reuse is not enabled. I attach the plan of the
>>                     execution.
>>
>>                     Thanks,
>>                     Simone
>>
>>
>>                     On 19/03/2018 11:36, Fabian Hueske wrote:
>>>                     Hi,
>>>
>>>                     Union is actually a very simple operator (not
>>>                     even an operator in Flink terms). It just merges
>>>                     to inputs. There is no additional logic involved.
>>>                     Therefore, it should also not emit records
>>>                     before either of both ReduceFunctions sorted its
>>>                     data.
>>>                     Once the data has been sorted for the
>>>                     ReduceFunction, the data is reduced and emitted
>>>                     in a pipelined fashion, i.e., once the first
>>>                     record is reduced, it is forwarded into the
>>>                     MapFunction (passing the unioned inputs).
>>>                     So it is not unexpected that Map starts
>>>                     processing before the ReduceFunction terminated.
>>>
>>>                     Did you enable object reuse [1]?
>>>                     If yes, try to disable it. If you want to reuse
>>>                     objects, you have to be careful in how you
>>>                     implement your functions.
>>>                     If no, can you share the plan
>>>                     (ExecutionEnvironment.getExecutionPlan()) that
>>>                     was generated for the program?
>>>
>>>                     Thanks,
>>>                     Fabian
>>>
>>>                     [1]
>>>                     https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
>>>                     <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>
>>>
>>>
>>>
>>>                     2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
>>>                     <pompermaier@okkam.it
>>>                     <ma...@okkam.it>>:
>>>
>>>                         Any help on this? This thing is very
>>>                         strange..the "manual" union of the output of
>>>                         the 2 datasets is different than the
>>>                         flink-union of them..
>>>                         Could it be a problem of the flink optimizer?
>>>
>>>                         Best,
>>>                         Flavio
>>>
>>>                         On Fri, Mar 16, 2018 at 4:01 PM, simone
>>>                         <simone.povoscania@gmail.com
>>>                         <ma...@gmail.com>> wrote:
>>>
>>>                             Sorry, I translated the code into
>>>                             pseudocode too fast. That is indeed an
>>>                             equals.
>>>
>>>
>>>                             On 16/03/2018 15:58, Kien Truong wrote:
>>>>
>>>>                             Hi,
>>>>
>>>>                             Just a guest, but string compare in
>>>>                             Java should be using equals method, not
>>>>                             == operator.
>>>>
>>>>                             Regards,
>>>>
>>>>                             Kien
>>>>
>>>>
>>>>                             On 3/16/2018 9:47 PM, simone wrote:
>>>>>                             /subject.getField("field1") == "";//
>>>>>                             /
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>
>
>


Re: Strange behavior on filter, group and reduce DataSets

Posted by Stephan Ewen <se...@apache.org>.
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters,
set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields
that is never used, but does not have a getter/setter. Does that make the
problem go away?

If either of that is the case, it must be a mutability bug somewhere in
either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Simone and Flavio,
>
> I created FLINK-9031 [1] for this issue.
> Please have a look and add any detail that you think could help to resolve
> the problem.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9031
>
> 2018-03-19 16:35 GMT+01:00 simone <si...@gmail.com>:
>
>> Hi Fabian,
>>
>> This simple code reproduces the behavior ->
>> https://github.com/xseris/Flink-test-union
>>
>> Thanks, Simone.
>>
>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>
>> Hmmm, I still don't see the problem.
>> IMO, the result should be correct for both plans. The data is replicated,
>> filtered, reduced, and unioned.
>> There is nothing in between the filter and reduce, that could cause
>> incorrect behavior.
>>
>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>> is either the Flink runtime code or your functions.
>> Given that one plan produces good results, it might be the Flink runtime
>> code.
>>
>> Coming back to my previous question.
>> Can you provide a minimal program to reproduce the issue?
>>
>> Thanks, Fabian
>>
>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>
>>> Ah, thanks for the update!
>>> I'll have a look at that.
>>>
>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>>
>>>> HI Simone,
>>>>
>>>> Looking at the plan, I don't see why this should be happening. The
>>>> pseudo code looks fine as well.
>>>> Any chance that you can create a minimal program to reproduce the
>>>> problem?
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>
>>>>> Thanks,
>>>>> Simone
>>>>>
>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Union is actually a very simple operator (not even an operator in
>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>> involved.
>>>>> Therefore, it should also not emit records before either of both
>>>>> ReduceFunctions sorted its data.
>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>>>> So it is not unexpected that Map starts processing before the
>>>>> ReduceFunction terminated.
>>>>>
>>>>> Did you enable object reuse [1]?
>>>>> If yes, try to disable it. If you want to reuse objects, you have to
>>>>> be careful in how you implement your functions.
>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>>> that was generated for the program?
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>
>>>>>
>>>>>
>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>>> Could it be a problem of the flink optimizer?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>> indeed an equals.
>>>>>>>
>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>> method, not == operator.
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Kien
>>>>>>>
>>>>>>>
>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>
>>>>>>> *subject.getField("field1") == "";*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve
the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <si...@gmail.com>:

> Hi Fabian,
>
> This simple code reproduces the behavior -> https://github.com/xseris/
> Flink-test-union
>
> Thanks, Simone.
>
> On 19/03/2018 15:44, Fabian Hueske wrote:
>
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is replicated,
> filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is, it is
> either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink runtime
> code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>
>> Ah, thanks for the update!
>> I'll have a look at that.
>>
>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>
>>> HI Simone,
>>>
>>> Looking at the plan, I don't see why this should be happening. The
>>> pseudo code looks fine as well.
>>> Any chance that you can create a minimal program to reproduce the
>>> problem?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> reuse is not enabled. I attach the plan of the execution.
>>>>
>>>> Thanks,
>>>> Simone
>>>>
>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>
>>>> Hi,
>>>>
>>>> Union is actually a very simple operator (not even an operator in Flink
>>>> terms). It just merges to inputs. There is no additional logic involved.
>>>> Therefore, it should also not emit records before either of both
>>>> ReduceFunctions sorted its data.
>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>>> So it is not unexpected that Map starts processing before the
>>>> ReduceFunction terminated.
>>>>
>>>> Did you enable object reuse [1]?
>>>> If yes, try to disable it. If you want to reuse objects, you have to be
>>>> careful in how you implement your functions.
>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>> that was generated for the program?
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>
>>>>
>>>>
>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>> Could it be a problem of the flink optimizer?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry, I translated the code into pseudocode too fast. That is indeed
>>>>>> an equals.
>>>>>>
>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>> method, not == operator.
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Kien
>>>>>>
>>>>>>
>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>
>>>>>> *subject.getField("field1") == "";*
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by simone <si...@gmail.com>.
Hi Fabian,

This simple code reproduces the behavior -> 
https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is 
> replicated, filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause 
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is, 
> it is either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink 
> runtime code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhueske@gmail.com 
> <ma...@gmail.com>>:
>
>     Ah, thanks for the update!
>     I'll have a look at that.
>
>     2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhueske@gmail.com
>     <ma...@gmail.com>>:
>
>         HI Simone,
>
>         Looking at the plan, I don't see why this should be happening.
>         The pseudo code looks fine as well.
>         Any chance that you can create a minimal program to reproduce
>         the problem?
>
>         Thanks,
>         Fabian
>
>         2018-03-19 12:04 GMT+01:00 simone <simone.povoscania@gmail.com
>         <ma...@gmail.com>>:
>
>             Hi Fabian,
>
>             reuse is not enabled. I attach the plan of the execution.
>
>             Thanks,
>             Simone
>
>
>             On 19/03/2018 11:36, Fabian Hueske wrote:
>>             Hi,
>>
>>             Union is actually a very simple operator (not even an
>>             operator in Flink terms). It just merges to inputs. There
>>             is no additional logic involved.
>>             Therefore, it should also not emit records before either
>>             of both ReduceFunctions sorted its data.
>>             Once the data has been sorted for the ReduceFunction, the
>>             data is reduced and emitted in a pipelined fashion, i.e.,
>>             once the first record is reduced, it is forwarded into
>>             the MapFunction (passing the unioned inputs).
>>             So it is not unexpected that Map starts processing before
>>             the ReduceFunction terminated.
>>
>>             Did you enable object reuse [1]?
>>             If yes, try to disable it. If you want to reuse objects,
>>             you have to be careful in how you implement your functions.
>>             If no, can you share the plan
>>             (ExecutionEnvironment.getExecutionPlan()) that was
>>             generated for the program?
>>
>>             Thanks,
>>             Fabian
>>
>>             [1]
>>             https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
>>             <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>
>>
>>
>>
>>             2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
>>             <pompermaier@okkam.it <ma...@okkam.it>>:
>>
>>                 Any help on this? This thing is very strange..the
>>                 "manual" union of the output of the 2 datasets is
>>                 different than the flink-union of them..
>>                 Could it be a problem of the flink optimizer?
>>
>>                 Best,
>>                 Flavio
>>
>>                 On Fri, Mar 16, 2018 at 4:01 PM, simone
>>                 <simone.povoscania@gmail.com
>>                 <ma...@gmail.com>> wrote:
>>
>>                     Sorry, I translated the code into pseudocode too
>>                     fast. That is indeed an equals.
>>
>>
>>                     On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>>                     Hi,
>>>
>>>                     Just a guest, but string compare in Java should
>>>                     be using equals method, not == operator.
>>>
>>>                     Regards,
>>>
>>>                     Kien
>>>
>>>
>>>                     On 3/16/2018 9:47 PM, simone wrote:
>>>>                     /subject.getField("field1") == "";//
>>>>                     /
>>
>>
>>
>>
>
>
>
>


Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated,
filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause
incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is
either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime
code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> Ah, thanks for the update!
> I'll have a look at that.
>
> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>
>> HI Simone,
>>
>> Looking at the plan, I don't see why this should be happening. The pseudo
>> code looks fine as well.
>> Any chance that you can create a minimal program to reproduce the problem?
>>
>> Thanks,
>> Fabian
>>
>> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> reuse is not enabled. I attach the plan of the execution.
>>>
>>> Thanks,
>>> Simone
>>>
>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>
>>> Hi,
>>>
>>> Union is actually a very simple operator (not even an operator in Flink
>>> terms). It just merges to inputs. There is no additional logic involved.
>>> Therefore, it should also not emit records before either of both
>>> ReduceFunctions sorted its data.
>>> Once the data has been sorted for the ReduceFunction, the data is
>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>> So it is not unexpected that Map starts processing before the
>>> ReduceFunction terminated.
>>>
>>> Did you enable object reuse [1]?
>>> If yes, try to disable it. If you want to reuse objects, you have to be
>>> careful in how you implement your functions.
>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>> that was generated for the program?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>
>>>
>>>
>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Any help on this? This thing is very strange..the "manual" union of the
>>>> output of the 2 datasets is different than the flink-union of them..
>>>> Could it be a problem of the flink optimizer?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sorry, I translated the code into pseudocode too fast. That is indeed
>>>>> an equals.
>>>>>
>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Just a guest, but string compare in Java should be using equals
>>>>> method, not == operator.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Kien
>>>>>
>>>>>
>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>
>>>>> *subject.getField("field1") == "";*
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> HI Simone,
>
> Looking at the plan, I don't see why this should be happening. The pseudo
> code looks fine as well.
> Any chance that you can create a minimal program to reproduce the problem?
>
> Thanks,
> Fabian
>
> 2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:
>
>> Hi Fabian,
>>
>> reuse is not enabled. I attach the plan of the execution.
>>
>> Thanks,
>> Simone
>>
>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>
>> Hi,
>>
>> Union is actually a very simple operator (not even an operator in Flink
>> terms). It just merges to inputs. There is no additional logic involved.
>> Therefore, it should also not emit records before either of both
>> ReduceFunctions sorted its data.
>> Once the data has been sorted for the ReduceFunction, the data is reduced
>> and emitted in a pipelined fashion, i.e., once the first record is reduced,
>> it is forwarded into the MapFunction (passing the unioned inputs).
>> So it is not unexpected that Map starts processing before the
>> ReduceFunction terminated.
>>
>> Did you enable object reuse [1]?
>> If yes, try to disable it. If you want to reuse objects, you have to be
>> careful in how you implement your functions.
>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>> that was generated for the program?
>>
>> Thanks,
>> Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>>
>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Any help on this? This thing is very strange..the "manual" union of the
>>> output of the 2 datasets is different than the flink-union of them..
>>> Could it be a problem of the flink optimizer?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com>
>>> wrote:
>>>
>>>> Sorry, I translated the code into pseudocode too fast. That is indeed
>>>> an equals.
>>>>
>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>
>>>> Hi,
>>>>
>>>> Just a guest, but string compare in Java should be using equals method,
>>>> not == operator.
>>>>
>>>> Regards,
>>>>
>>>> Kien
>>>>
>>>>
>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>
>>>> *subject.getField("field1") == "";*
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo
code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <si...@gmail.com>:

> Hi Fabian,
>
> reuse is not enabled. I attach the plan of the execution.
>
> Thanks,
> Simone
>
> On 19/03/2018 11:36, Fabian Hueske wrote:
>
> Hi,
>
> Union is actually a very simple operator (not even an operator in Flink
> terms). It just merges to inputs. There is no additional logic involved.
> Therefore, it should also not emit records before either of both
> ReduceFunctions sorted its data.
> Once the data has been sorted for the ReduceFunction, the data is reduced
> and emitted in a pipelined fashion, i.e., once the first record is reduced,
> it is forwarded into the MapFunction (passing the unioned inputs).
> So it is not unexpected that Map starts processing before the
> ReduceFunction terminated.
>
> Did you enable object reuse [1]?
> If yes, try to disable it. If you want to reuse objects, you have to be
> careful in how you implement your functions.
> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
> that was generated for the program?
>
> Thanks,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/batch/index.html#operating-on-data-objects-in-functions
>
>
>
> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Any help on this? This thing is very strange..the "manual" union of the
>> output of the 2 datasets is different than the flink-union of them..
>> Could it be a problem of the flink optimizer?
>>
>> Best,
>> Flavio
>>
>> On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com>
>> wrote:
>>
>>> Sorry, I translated the code into pseudocode too fast. That is indeed an
>>> equals.
>>>
>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>> Hi,
>>>
>>> Just a guest, but string compare in Java should be using equals method,
>>> not == operator.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>>
>>> On 3/16/2018 9:47 PM, simone wrote:
>>>
>>> *subject.getField("field1") == "";*
>>>
>>>
>>>
>>
>>
>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by simone <si...@gmail.com>.
Hi Fabian,

I have an update. Forcing a re-balance after the union (id 5,86 of the 
previous plan), the output meets the expectations:

DataSet<POJO> ds3 = ds1.union(ds2);
ds3 = ds3.rebalance();

The new produced plan (and the old one) is attached to this mail. I 
still don't understand why, without rebalancing, the execution have that 
strange behavior. Any idea?

Thanks,
Simone.


On 19/03/2018 12:04, simone wrote:
>
> Hi Fabian,
>
> reuse is not enabled. I attach the plan of the execution.
>
> Thanks,
> Simone
>
>
> On 19/03/2018 11:36, Fabian Hueske wrote:
>> Hi,
>>
>> Union is actually a very simple operator (not even an operator in 
>> Flink terms). It just merges to inputs. There is no additional logic 
>> involved.
>> Therefore, it should also not emit records before either of both 
>> ReduceFunctions sorted its data.
>> Once the data has been sorted for the ReduceFunction, the data is 
>> reduced and emitted in a pipelined fashion, i.e., once the first 
>> record is reduced, it is forwarded into the MapFunction (passing the 
>> unioned inputs).
>> So it is not unexpected that Map starts processing before the 
>> ReduceFunction terminated.
>>
>> Did you enable object reuse [1]?
>> If yes, try to disable it. If you want to reuse objects, you have to 
>> be careful in how you implement your functions.
>> If no, can you share the plan 
>> (ExecutionEnvironment.getExecutionPlan()) that was generated for the 
>> program?
>>
>> Thanks,
>> Fabian
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions 
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>
>>
>>
>>
>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it 
>> <ma...@okkam.it>>:
>>
>>     Any help on this? This thing is very strange..the "manual" union
>>     of the output of the 2 datasets is different than the flink-union
>>     of them..
>>     Could it be a problem of the flink optimizer?
>>
>>     Best,
>>     Flavio
>>
>>     On Fri, Mar 16, 2018 at 4:01 PM, simone
>>     <simone.povoscania@gmail.com
>>     <ma...@gmail.com>> wrote:
>>
>>         Sorry, I translated the code into pseudocode too fast. That
>>         is indeed an equals.
>>
>>
>>         On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>>         Hi,
>>>
>>>         Just a guest, but string compare in Java should be using
>>>         equals method, not == operator.
>>>
>>>         Regards,
>>>
>>>         Kien
>>>
>>>
>>>         On 3/16/2018 9:47 PM, simone wrote:
>>>>         /subject.getField("field1") == "";//
>>>>         /
>>
>>
>>
>>
>


Re: Strange behavior on filter, group and reduce DataSets

Posted by simone <si...@gmail.com>.
Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
> Hi,
>
> Union is actually a very simple operator (not even an operator in 
> Flink terms). It just merges to inputs. There is no additional logic 
> involved.
> Therefore, it should also not emit records before either of both 
> ReduceFunctions sorted its data.
> Once the data has been sorted for the ReduceFunction, the data is 
> reduced and emitted in a pipelined fashion, i.e., once the first 
> record is reduced, it is forwarded into the MapFunction (passing the 
> unioned inputs).
> So it is not unexpected that Map starts processing before the 
> ReduceFunction terminated.
>
> Did you enable object reuse [1]?
> If yes, try to disable it. If you want to reuse objects, you have to 
> be careful in how you implement your functions.
> If no, can you share the plan 
> (ExecutionEnvironment.getExecutionPlan()) that was generated for the 
> program?
>
> Thanks,
> Fabian
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>
>
>
>
> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it 
> <ma...@okkam.it>>:
>
>     Any help on this? This thing is very strange..the "manual" union
>     of the output of the 2 datasets is different than the flink-union
>     of them..
>     Could it be a problem of the flink optimizer?
>
>     Best,
>     Flavio
>
>     On Fri, Mar 16, 2018 at 4:01 PM, simone
>     <simone.povoscania@gmail.com <ma...@gmail.com>>
>     wrote:
>
>         Sorry, I translated the code into pseudocode too fast. That is
>         indeed an equals.
>
>
>         On 16/03/2018 15:58, Kien Truong wrote:
>>
>>         Hi,
>>
>>         Just a guest, but string compare in Java should be using
>>         equals method, not == operator.
>>
>>         Regards,
>>
>>         Kien
>>
>>
>>         On 3/16/2018 9:47 PM, simone wrote:
>>>         /subject.getField("field1") == "";//
>>>         /
>
>
>
>


Re: Strange behavior on filter, group and reduce DataSets

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Union is actually a very simple operator (not even an operator in Flink
terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both
ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced
and emitted in a pipelined fashion, i.e., once the first record is reduced,
it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the
ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be
careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-
release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Any help on this? This thing is very strange..the "manual" union of the
> output of the 2 datasets is different than the flink-union of them..
> Could it be a problem of the flink optimizer?
>
> Best,
> Flavio
>
> On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com>
> wrote:
>
>> Sorry, I translated the code into pseudocode too fast. That is indeed an
>> equals.
>>
>> On 16/03/2018 15:58, Kien Truong wrote:
>>
>> Hi,
>>
>> Just a guest, but string compare in Java should be using equals method,
>> not == operator.
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 3/16/2018 9:47 PM, simone wrote:
>>
>> *subject.getField("field1") == "";*
>>
>>
>>
>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by Flavio Pompermaier <po...@okkam.it>.
Any help on this? This thing is very strange..the "manual" union of the
output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <si...@gmail.com> wrote:

> Sorry, I translated the code into pseudocode too fast. That is indeed an
> equals.
>
> On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals method,
> not == operator.
>
> Regards,
>
> Kien
>
>
> On 3/16/2018 9:47 PM, simone wrote:
>
> *subject.getField("field1") == "";*
>
>
>

Re: Strange behavior on filter, group and reduce DataSets

Posted by simone <si...@gmail.com>.
Sorry, I translated the code into pseudocode too fast. That is indeed an 
equals.


On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals 
> method, not == operator.
>
> Regards,
>
> Kien
>
>
> On 3/16/2018 9:47 PM, simone wrote:
>> /subject.getField("field1") == "";//
>> /


Re: Strange behavior on filter, group and reduce DataSets

Posted by Kien Truong <du...@gmail.com>.
Hi,

Just a guest, but string compare in Java should be using equals method, 
not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
> /subject.getField("field1") == "";//
> /