You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by rahul patwari <ra...@gmail.com> on 2019/08/08 17:23:01 UTC

Inconsistent Results with GroupIntoBatches PTransform

Hi,

I am getting inconsistent results when using GroupIntoBatches PTransform.
I am using Create.of() PTransform to create a PCollection from in-memory.
When a coder is given with Create.of() PTransform, I am facing the issue.
If the coder is not provided, the results are consistent and correct(Maybe
this is just a coincidence and the problem is at some other place).
If Batch Size is 1, results are always consistent.

Not sure if this is an issue with Serialization/Deserialization (or)
GroupIntoBatches (or) Create.of() PTransform.

The Java code, expected correct results, and inconsistent results are
available at https://github.com/rahul8383/beam-examples

Thanks,
Rahul

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Rahul,

I cannot tell for sure. The fix was applied at runners-core, so - 
technically - it was possible that multiple runners were affected. A 
runner would be affected, if and only if, it would use something that 
depends on hashCode() of StateTag (or StateSpec) and user would use a 
Coder for that state that doesn't correctly implement hashCode() and 
equals() - SchemaCoder is one of such example.

After a few greps on the repository, I think that it might be possible, 
that Dataflow runner would be (more or less) affected by this as well 
(but someone from Dataflow team might confirm or disprove that better 
than me). Possibly affected code is at WindmillStateReader.java, which 
uses ConcurrentHashMap with StateTag as key. I'm not able to tell the 
consequences of that. I didn't find any obvious uses of HashMap or 
HashSet of StateTags in other runners. But that doesn't mean, that there 
really isn't any. :-)

Either way, by using version 2.14.0 you should be safe on all runners.

Jan

On 8/9/19 10:59 AM, rahul patwari wrote:
> Hi Jan,
>
> I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and 
> the results are always correct. No more inconsistencies.
>
> Does BEAM-7269 affect all the runners?
>
> Thanks,
> Rahul
>
> On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi Rahul,
>
>     what version of Beam are you using? There was a bug [1], which was
>     fixed in 2.14.0. This bug could cause what you observe.
>
>     Jan
>
>     [1] https://issues.apache.org/jira/browse/BEAM-7269
>
>     On 8/9/19 10:35 AM, rahul patwari wrote:
>>     Hi Robert,
>>
>>     When PCollection is created using
>>     "Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am
>>     getting "Inconsistent" results.
>>     By "Inconsistent", I mean that the result is "Incorrect"
>>     sometimes(most of the times).
>>     By "Incorrect" result, I mean that the elements are missing. The
>>     elements are not duplicated. The elements are not batched
>>     differently.
>>
>>     I have used System.identityHashcode(this) to convert
>>     PCollection<Row> to PCollection<KV<Integer, Row>> to apply
>>     Stateful Pardo(GroupIntoBatches) as per your suggestion in this
>>     thread
>>     <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>
>>
>>     To verify the result, I have used GroupByKey, which should give
>>     the same result as GroupIntoBatches *for my case*.
>>
>>     However, When PCollection is created using
>>     "Create.of(listOfRow)", the results are always correct.
>>
>>     Regards,
>>     Rahul
>>
>>     On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw
>>     <robertwb@google.com <ma...@google.com>> wrote:
>>
>>         Could you clarify what you mean by "inconsistent" and
>>         "incorrect"? Are
>>         elements missing/duplicated, or just batched differently?
>>
>>         On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
>>         <rahulpatwari8383@gmail.com
>>         <ma...@gmail.com>> wrote:
>>         >
>>         > I only ran in Direct runner. I will run in other runners
>>         and let you know the results.
>>         > I am not setting "streaming" when executing.
>>         >
>>         > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lcwik@google.com
>>         <ma...@google.com>> wrote:
>>         >>
>>         >> Have you tried running this on more than one runner (e.g.
>>         Dataflow, Flink, Direct)?
>>         >>
>>         >> Are you setting --streaming when executing?
>>         >>
>>         >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
>>         <rahulpatwari8383@gmail.com
>>         <ma...@gmail.com>> wrote:
>>         >>>
>>         >>> Hi,
>>         >>>
>>         >>> I am getting inconsistent results when using
>>         GroupIntoBatches PTransform.
>>         >>> I am using Create.of() PTransform to create a PCollection
>>         from in-memory. When a coder is given with Create.of()
>>         PTransform, I am facing the issue.
>>         >>> If the coder is not provided, the results are consistent
>>         and correct(Maybe this is just a coincidence and the problem
>>         is at some other place).
>>         >>> If Batch Size is 1, results are always consistent.
>>         >>>
>>         >>> Not sure if this is an issue with
>>         Serialization/Deserialization (or) GroupIntoBatches (or)
>>         Create.of() PTransform.
>>         >>>
>>         >>> The Java code, expected correct results, and inconsistent
>>         results are available at
>>         https://github.com/rahul8383/beam-examples
>>         >>>
>>         >>> Thanks,
>>         >>> Rahul
>>

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by rahul patwari <ra...@gmail.com>.
Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the
results are always correct. No more inconsistencies.

Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Rahul,
>
> what version of Beam are you using? There was a bug [1], which was fixed
> in 2.14.0. This bug could cause what you observe.
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7269
> On 8/9/19 10:35 AM, rahul patwari wrote:
>
> Hi Robert,
>
> When PCollection is created using "Create.of(listOfRow)
> *.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
> By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
> the times).
> By "Incorrect" result, I mean that the elements are missing. The elements
> are not duplicated. The elements are not batched differently.
>
> I have used System.identityHashcode(this) to convert PCollection<Row> to
> PCollection<KV<Integer, Row>> to apply Stateful Pardo(GroupIntoBatches) as
> per your suggestion in this thread
> <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>
>
> To verify the result, I have used GroupByKey, which should give the
> same result as GroupIntoBatches *for my case*.
>
> However, When PCollection is created using "Create.of(listOfRow)", the
> results are always correct.
>
> Regards,
> Rahul
>
> On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
>> elements missing/duplicated, or just batched differently?
>>
>> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari <ra...@gmail.com>
>> wrote:
>> >
>> > I only ran in Direct runner. I will run in other runners and let you
>> know the results.
>> > I am not setting "streaming" when executing.
>> >
>> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com> wrote:
>> >>
>> >> Have you tried running this on more than one runner (e.g. Dataflow,
>> Flink, Direct)?
>> >>
>> >> Are you setting --streaming when executing?
>> >>
>> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
>> rahulpatwari8383@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I am getting inconsistent results when using GroupIntoBatches
>> PTransform.
>> >>> I am using Create.of() PTransform to create a PCollection from
>> in-memory. When a coder is given with Create.of() PTransform, I am facing
>> the issue.
>> >>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> >>> If Batch Size is 1, results are always consistent.
>> >>>
>> >>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>> >>>
>> >>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>> >>>
>> >>> Thanks,
>> >>> Rahul
>>
>

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Rahul,

what version of Beam are you using? There was a bug [1], which was fixed 
in 2.14.0. This bug could cause what you observe.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

On 8/9/19 10:35 AM, rahul patwari wrote:
> Hi Robert,
>
> When PCollection is created using 
> "Create.of(listOfRow)*.withCoder(RowCoder.of(schema))*", I am getting 
> "Inconsistent" results.
> By "Inconsistent", I mean that the result is "Incorrect" 
> sometimes(most of the times).
> By "Incorrect" result, I mean that the elements are missing. The 
> elements are not duplicated. The elements are not batched differently.
>
> I have used System.identityHashcode(this) to convert PCollection<Row> 
> to PCollection<KV<Integer, Row>> to apply Stateful 
> Pardo(GroupIntoBatches) as per your suggestion in this thread 
> <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E> 
>
> To verify the result, I have used GroupByKey, which should give the 
> same result as GroupIntoBatches *for my case*.
>
> However, When PCollection is created using "Create.of(listOfRow)", the 
> results are always correct.
>
> Regards,
> Rahul
>
> On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <robertwb@google.com 
> <ma...@google.com>> wrote:
>
>     Could you clarify what you mean by "inconsistent" and "incorrect"? Are
>     elements missing/duplicated, or just batched differently?
>
>     On Fri, Aug 9, 2019 at 2:18 AM rahul patwari
>     <rahulpatwari8383@gmail.com <ma...@gmail.com>>
>     wrote:
>     >
>     > I only ran in Direct runner. I will run in other runners and let
>     you know the results.
>     > I am not setting "streaming" when executing.
>     >
>     > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lcwik@google.com
>     <ma...@google.com>> wrote:
>     >>
>     >> Have you tried running this on more than one runner (e.g.
>     Dataflow, Flink, Direct)?
>     >>
>     >> Are you setting --streaming when executing?
>     >>
>     >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari
>     <rahulpatwari8383@gmail.com <ma...@gmail.com>>
>     wrote:
>     >>>
>     >>> Hi,
>     >>>
>     >>> I am getting inconsistent results when using GroupIntoBatches
>     PTransform.
>     >>> I am using Create.of() PTransform to create a PCollection from
>     in-memory. When a coder is given with Create.of() PTransform, I am
>     facing the issue.
>     >>> If the coder is not provided, the results are consistent and
>     correct(Maybe this is just a coincidence and the problem is at
>     some other place).
>     >>> If Batch Size is 1, results are always consistent.
>     >>>
>     >>> Not sure if this is an issue with
>     Serialization/Deserialization (or) GroupIntoBatches (or)
>     Create.of() PTransform.
>     >>>
>     >>> The Java code, expected correct results, and inconsistent
>     results are available at https://github.com/rahul8383/beam-examples
>     >>>
>     >>> Thanks,
>     >>> Rahul
>

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by rahul patwari <ra...@gmail.com>.
Hi Robert,

When PCollection is created using "Create.of(listOfRow)
*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
the times).
By "Incorrect" result, I mean that the elements are missing. The elements
are not duplicated. The elements are not batched differently.

I have used System.identityHashcode(this) to convert PCollection<Row> to
PCollection<KV<Integer, Row>> to apply Stateful Pardo(GroupIntoBatches) as
per your suggestion in this thread
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>

To verify the result, I have used GroupByKey, which should give the
same result as GroupIntoBatches *for my case*.

However, When PCollection is created using "Create.of(listOfRow)", the
results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw <ro...@google.com> wrote:

> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
> elements missing/duplicated, or just batched differently?
>
> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari <ra...@gmail.com>
> wrote:
> >
> > I only ran in Direct runner. I will run in other runners and let you
> know the results.
> > I am not setting "streaming" when executing.
> >
> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com> wrote:
> >>
> >> Have you tried running this on more than one runner (e.g. Dataflow,
> Flink, Direct)?
> >>
> >> Are you setting --streaming when executing?
> >>
> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
> rahulpatwari8383@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am getting inconsistent results when using GroupIntoBatches
> PTransform.
> >>> I am using Create.of() PTransform to create a PCollection from
> in-memory. When a coder is given with Create.of() PTransform, I am facing
> the issue.
> >>> If the coder is not provided, the results are consistent and
> correct(Maybe this is just a coincidence and the problem is at some other
> place).
> >>> If Batch Size is 1, results are always consistent.
> >>>
> >>> Not sure if this is an issue with Serialization/Deserialization (or)
> GroupIntoBatches (or) Create.of() PTransform.
> >>>
> >>> The Java code, expected correct results, and inconsistent results are
> available at https://github.com/rahul8383/beam-examples
> >>>
> >>> Thanks,
> >>> Rahul
>

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by Robert Bradshaw <ro...@google.com>.
Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari <ra...@gmail.com> wrote:
>
> I only ran in Direct runner. I will run in other runners and let you know the results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com> wrote:
>>
>> Have you tried running this on more than one runner (e.g. Dataflow, Flink, Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <ra...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection from in-memory. When a coder is given with Create.of() PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent and correct(Maybe this is just a coincidence and the problem is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with Serialization/Deserialization (or) GroupIntoBatches (or) Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent results are available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by rahul patwari <ra...@gmail.com>.
I only ran in Direct runner. I will run in other runners and let you know
the results.
I am not setting "streaming" when executing.

On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik, <lc...@google.com> wrote:

> Have you tried running this on more than one runner (e.g. Dataflow, Flink,
> Direct)?
>
> Are you setting --streaming when executing?
>
> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>> I am using Create.of() PTransform to create a PCollection from in-memory.
>> When a coder is given with Create.of() PTransform, I am facing the issue.
>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> If Batch Size is 1, results are always consistent.
>>
>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>>
>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>>
>> Thanks,
>> Rahul
>>
>

Re: Inconsistent Results with GroupIntoBatches PTransform

Posted by Lukasz Cwik <lc...@google.com>.
Have you tried running this on more than one runner (e.g. Dataflow, Flink,
Direct)?

Are you setting --streaming when executing?

On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> I am getting inconsistent results when using GroupIntoBatches PTransform.
> I am using Create.of() PTransform to create a PCollection from in-memory.
> When a coder is given with Create.of() PTransform, I am facing the issue.
> If the coder is not provided, the results are consistent and correct(Maybe
> this is just a coincidence and the problem is at some other place).
> If Batch Size is 1, results are always consistent.
>
> Not sure if this is an issue with Serialization/Deserialization (or)
> GroupIntoBatches (or) Create.of() PTransform.
>
> The Java code, expected correct results, and inconsistent results are
> available at https://github.com/rahul8383/beam-examples
>
> Thanks,
> Rahul
>