You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Vasu Gupta <de...@gmail.com> on 2020/02/27 09:16:05 UTC

GroupIntoBatches not Working properly for Direct Runner Java

Hey folks, I am using Apache beam Framework in Java with Direction Runner for local testing purposes. When using GroupIntoBatches with batch size 1 it works perfectly fine i.e. the output of the transform is consistent and as expected. But when using with batch size > 1 the output Pcollection has less data than it should be.

Pipeline flow:
1. A Transform for reading from pubsub
2. Transform for making a KV out of the data
3. A Fixed Window transform of 1 second
4. Applying GroupIntoBatches transform
5. And last, Logging the resulting Iterables.

Weird thing is that it batch_size > 1 works great when running on DataflowRunner but not with DirectRunner. I think the issue might be with Timer Expiry since GroupIntoBatches uses BagState internally.

Any help will be much appreciated.

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,

+1 to what Kenn asked: your pipeline is in streaming mode and GIB 
preserves windowing, the elements are buffered until one of these 
conditions are true: batchsize reached or end of window. I your case I 
think it is the second one.

Best

Etienne

On 28/02/2020 19:15, Kenneth Knowles wrote:
> What are the timestamps on the elements?
>
> On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <dev.vasugupta@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
>     Issue Details:
>     Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1,
>     e-4, e-5
>     Batch Size: 5
>     Expected output: a-1,4, b-3, c-5, d-1, e-4,5
>     Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4,
>     c-5 etc
>     But i always got correct number of packets with BATCH_SIZE = 1
>
>     On 2020/02/27 20:40:16, Kenneth Knowles <kenn@apache.org
>     <ma...@apache.org>> wrote:
>     > Can you share some more details? What is the expected output and
>     what
>     > output are you seeing?
>     >
>     > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta
>     <dev.vasugupta@gmail.com <ma...@gmail.com>> wrote:
>     >
>     > > Hey folks, I am using Apache beam Framework in Java with
>     Direction Runner
>     > > for local testing purposes. When using GroupIntoBatches with
>     batch size 1
>     > > it works perfectly fine i.e. the output of the transform is
>     consistent and
>     > > as expected. But when using with batch size > 1 the output
>     Pcollection has
>     > > less data than it should be.
>     > >
>     > > Pipeline flow:
>     > > 1. A Transform for reading from pubsub
>     > > 2. Transform for making a KV out of the data
>     > > 3. A Fixed Window transform of 1 second
>     > > 4. Applying GroupIntoBatches transform
>     > > 5. And last, Logging the resulting Iterables.
>     > >
>     > > Weird thing is that it batch_size > 1 works great when running on
>     > > DataflowRunner but not with DirectRunner. I think the issue
>     might be with
>     > > Timer Expiry since GroupIntoBatches uses BagState internally.
>     > >
>     > > Any help will be much appreciated.
>     > >
>     >
>

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Kenneth Knowles <ke...@apache.org>.
Great catch, Reza. Dataflow uses a customized C++ based Pubsub source that
calculates the watermark differently. So the Java-based PubsubIO may have
different behavior with respect the watermark (expected) or may simply have
bugs (not desirable).

How are you feeding test data into Pubsub? Are you using TestPubsub [1] or
your own wrapper?

Kenn

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java

On Sat, Mar 7, 2020 at 1:46 AM Reza Rokni <re...@google.com> wrote:

> Hi,
>
> There are some known issues with the DirectRunner PubSubIO, which are not
> present with Dataflow Runner. One of them is around watermarks:
>
> https://issues.apache.org/jira/browse/BEAM-7322
>
> Not sure if this is part of the issue here, but worth exploring..
>
> When testing are you sending a small volume of information and then
> stopping or are you sending continuous output?
>
> Cheers
>
> Reza
>
> On Sat, Mar 7, 2020 at 1:29 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Can you reproduce it if you replace your Pubsub source with a TestStream
>> and verify with PAssert [1]? This would enable you to easily build a unit
>> test. You could even open a pull request adding that to the test suite for
>> GroupIntoBatches [2]. That would be an excellent contribution to Beam.
>>
>> Kenn
>>
>> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
>>
>> On Mon, Mar 2, 2020 at 9:25 AM Vasu Gupta <de...@gmail.com>
>> wrote:
>>
>>> Input : a-1, Timestamp : 1582994620366
>>> Input : c-2, Timestamp : 1582994620367
>>> Input : e-3, Timestamp : 1582994620367
>>> Input : d-4, Timestamp : 1582994620367
>>> Input : e-5, Timestamp : 1582994620367
>>> Input : b-6, Timestamp : 1582994620368
>>> Input : a-7, Timestamp : 1582994620368
>>>
>>> Output : Timestamp : 1582994620367, Key : e-3,5
>>> Output : Timestamp : 1582994620368, Key : a-1,7
>>>
>>> As you can see c-2 and d-4 are missing and I never received these
>>> packets.
>>>
>>> On 2020/02/28 18:15:03, Kenneth Knowles <ke...@apache.org> wrote:
>>> > What are the timestamps on the elements?
>>> >
>>> > On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <de...@gmail.com>
>>> wrote:
>>> >
>>> > > Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
>>> > > Issue Details:
>>> > > Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4,
>>> e-5
>>> > > Batch Size: 5
>>> > > Expected output: a-1,4, b-3, c-5, d-1, e-4,5
>>> > > Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4,
>>> c-5 etc
>>> > > But i always got correct number of packets with BATCH_SIZE = 1
>>> > >
>>> > > On 2020/02/27 20:40:16, Kenneth Knowles <ke...@apache.org> wrote:
>>> > > > Can you share some more details? What is the expected output and
>>> what
>>> > > > output are you seeing?
>>> > > >
>>> > > > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <
>>> dev.vasugupta@gmail.com>
>>> > > wrote:
>>> > > >
>>> > > > > Hey folks, I am using Apache beam Framework in Java with
>>> Direction
>>> > > Runner
>>> > > > > for local testing purposes. When using GroupIntoBatches with
>>> batch
>>> > > size 1
>>> > > > > it works perfectly fine i.e. the output of the transform is
>>> consistent
>>> > > and
>>> > > > > as expected. But when using with batch size > 1 the output
>>> Pcollection
>>> > > has
>>> > > > > less data than it should be.
>>> > > > >
>>> > > > > Pipeline flow:
>>> > > > > 1. A Transform for reading from pubsub
>>> > > > > 2. Transform for making a KV out of the data
>>> > > > > 3. A Fixed Window transform of 1 second
>>> > > > > 4. Applying GroupIntoBatches transform
>>> > > > > 5. And last, Logging the resulting Iterables.
>>> > > > >
>>> > > > > Weird thing is that it batch_size > 1 works great when running on
>>> > > > > DataflowRunner but not with DirectRunner. I think the issue
>>> might be
>>> > > with
>>> > > > > Timer Expiry since GroupIntoBatches uses BagState internally.
>>> > > > >
>>> > > > > Any help will be much appreciated.
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Reza Rokni <re...@google.com>.
Hi,

There are some known issues with the DirectRunner PubSubIO, which are not
present with Dataflow Runner. One of them is around watermarks:

https://issues.apache.org/jira/browse/BEAM-7322

Not sure if this is part of the issue here, but worth exploring..

When testing are you sending a small volume of information and then
stopping or are you sending continuous output?

Cheers

Reza

On Sat, Mar 7, 2020 at 1:29 PM Kenneth Knowles <ke...@apache.org> wrote:

> Can you reproduce it if you replace your Pubsub source with a TestStream
> and verify with PAssert [1]? This would enable you to easily build a unit
> test. You could even open a pull request adding that to the test suite for
> GroupIntoBatches [2]. That would be an excellent contribution to Beam.
>
> Kenn
>
> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
>
> On Mon, Mar 2, 2020 at 9:25 AM Vasu Gupta <de...@gmail.com> wrote:
>
>> Input : a-1, Timestamp : 1582994620366
>> Input : c-2, Timestamp : 1582994620367
>> Input : e-3, Timestamp : 1582994620367
>> Input : d-4, Timestamp : 1582994620367
>> Input : e-5, Timestamp : 1582994620367
>> Input : b-6, Timestamp : 1582994620368
>> Input : a-7, Timestamp : 1582994620368
>>
>> Output : Timestamp : 1582994620367, Key : e-3,5
>> Output : Timestamp : 1582994620368, Key : a-1,7
>>
>> As you can see c-2 and d-4 are missing and I never received these packets.
>>
>> On 2020/02/28 18:15:03, Kenneth Knowles <ke...@apache.org> wrote:
>> > What are the timestamps on the elements?
>> >
>> > On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <de...@gmail.com>
>> wrote:
>> >
>> > > Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
>> > > Issue Details:
>> > > Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4,
>> e-5
>> > > Batch Size: 5
>> > > Expected output: a-1,4, b-3, c-5, d-1, e-4,5
>> > > Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4,
>> c-5 etc
>> > > But i always got correct number of packets with BATCH_SIZE = 1
>> > >
>> > > On 2020/02/27 20:40:16, Kenneth Knowles <ke...@apache.org> wrote:
>> > > > Can you share some more details? What is the expected output and
>> what
>> > > > output are you seeing?
>> > > >
>> > > > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <dev.vasugupta@gmail.com
>> >
>> > > wrote:
>> > > >
>> > > > > Hey folks, I am using Apache beam Framework in Java with Direction
>> > > Runner
>> > > > > for local testing purposes. When using GroupIntoBatches with batch
>> > > size 1
>> > > > > it works perfectly fine i.e. the output of the transform is
>> consistent
>> > > and
>> > > > > as expected. But when using with batch size > 1 the output
>> Pcollection
>> > > has
>> > > > > less data than it should be.
>> > > > >
>> > > > > Pipeline flow:
>> > > > > 1. A Transform for reading from pubsub
>> > > > > 2. Transform for making a KV out of the data
>> > > > > 3. A Fixed Window transform of 1 second
>> > > > > 4. Applying GroupIntoBatches transform
>> > > > > 5. And last, Logging the resulting Iterables.
>> > > > >
>> > > > > Weird thing is that it batch_size > 1 works great when running on
>> > > > > DataflowRunner but not with DirectRunner. I think the issue might
>> be
>> > > with
>> > > > > Timer Expiry since GroupIntoBatches uses BagState internally.
>> > > > >
>> > > > > Any help will be much appreciated.
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Kenneth Knowles <ke...@apache.org>.
Can you reproduce it if you replace your Pubsub source with a TestStream
and verify with PAssert [1]? This would enable you to easily build a unit
test. You could even open a pull request adding that to the test suite for
GroupIntoBatches [2]. That would be an excellent contribution to Beam.

Kenn

[1] https://beam.apache.org/blog/2016/10/20/test-stream.html
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java

On Mon, Mar 2, 2020 at 9:25 AM Vasu Gupta <de...@gmail.com> wrote:

> Input : a-1, Timestamp : 1582994620366
> Input : c-2, Timestamp : 1582994620367
> Input : e-3, Timestamp : 1582994620367
> Input : d-4, Timestamp : 1582994620367
> Input : e-5, Timestamp : 1582994620367
> Input : b-6, Timestamp : 1582994620368
> Input : a-7, Timestamp : 1582994620368
>
> Output : Timestamp : 1582994620367, Key : e-3,5
> Output : Timestamp : 1582994620368, Key : a-1,7
>
> As you can see c-2 and d-4 are missing and I never received these packets.
>
> On 2020/02/28 18:15:03, Kenneth Knowles <ke...@apache.org> wrote:
> > What are the timestamps on the elements?
> >
> > On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <de...@gmail.com>
> wrote:
> >
> > > Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
> > > Issue Details:
> > > Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4, e-5
> > > Batch Size: 5
> > > Expected output: a-1,4, b-3, c-5, d-1, e-4,5
> > > Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4, c-5
> etc
> > > But i always got correct number of packets with BATCH_SIZE = 1
> > >
> > > On 2020/02/27 20:40:16, Kenneth Knowles <ke...@apache.org> wrote:
> > > > Can you share some more details? What is the expected output and what
> > > > output are you seeing?
> > > >
> > > > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <de...@gmail.com>
> > > wrote:
> > > >
> > > > > Hey folks, I am using Apache beam Framework in Java with Direction
> > > Runner
> > > > > for local testing purposes. When using GroupIntoBatches with batch
> > > size 1
> > > > > it works perfectly fine i.e. the output of the transform is
> consistent
> > > and
> > > > > as expected. But when using with batch size > 1 the output
> Pcollection
> > > has
> > > > > less data than it should be.
> > > > >
> > > > > Pipeline flow:
> > > > > 1. A Transform for reading from pubsub
> > > > > 2. Transform for making a KV out of the data
> > > > > 3. A Fixed Window transform of 1 second
> > > > > 4. Applying GroupIntoBatches transform
> > > > > 5. And last, Logging the resulting Iterables.
> > > > >
> > > > > Weird thing is that it batch_size > 1 works great when running on
> > > > > DataflowRunner but not with DirectRunner. I think the issue might
> be
> > > with
> > > > > Timer Expiry since GroupIntoBatches uses BagState internally.
> > > > >
> > > > > Any help will be much appreciated.
> > > > >
> > > >
> > >
> >
>

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Vasu Gupta <de...@gmail.com>.
Input : a-1, Timestamp : 1582994620366
Input : c-2, Timestamp : 1582994620367
Input : e-3, Timestamp : 1582994620367
Input : d-4, Timestamp : 1582994620367
Input : e-5, Timestamp : 1582994620367
Input : b-6, Timestamp : 1582994620368
Input : a-7, Timestamp : 1582994620368

Output : Timestamp : 1582994620367, Key : e-3,5
Output : Timestamp : 1582994620368, Key : a-1,7

As you can see c-2 and d-4 are missing and I never received these packets.

On 2020/02/28 18:15:03, Kenneth Knowles <ke...@apache.org> wrote: 
> What are the timestamps on the elements?
> 
> On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <de...@gmail.com> wrote:
> 
> > Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
> > Issue Details:
> > Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4, e-5
> > Batch Size: 5
> > Expected output: a-1,4, b-3, c-5, d-1, e-4,5
> > Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4, c-5 etc
> > But i always got correct number of packets with BATCH_SIZE = 1
> >
> > On 2020/02/27 20:40:16, Kenneth Knowles <ke...@apache.org> wrote:
> > > Can you share some more details? What is the expected output and what
> > > output are you seeing?
> > >
> > > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <de...@gmail.com>
> > wrote:
> > >
> > > > Hey folks, I am using Apache beam Framework in Java with Direction
> > Runner
> > > > for local testing purposes. When using GroupIntoBatches with batch
> > size 1
> > > > it works perfectly fine i.e. the output of the transform is consistent
> > and
> > > > as expected. But when using with batch size > 1 the output Pcollection
> > has
> > > > less data than it should be.
> > > >
> > > > Pipeline flow:
> > > > 1. A Transform for reading from pubsub
> > > > 2. Transform for making a KV out of the data
> > > > 3. A Fixed Window transform of 1 second
> > > > 4. Applying GroupIntoBatches transform
> > > > 5. And last, Logging the resulting Iterables.
> > > >
> > > > Weird thing is that it batch_size > 1 works great when running on
> > > > DataflowRunner but not with DirectRunner. I think the issue might be
> > with
> > > > Timer Expiry since GroupIntoBatches uses BagState internally.
> > > >
> > > > Any help will be much appreciated.
> > > >
> > >
> >
> 

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Kenneth Knowles <ke...@apache.org>.
What are the timestamps on the elements?

On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <de...@gmail.com> wrote:

> Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
> Issue Details:
> Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4, e-5
> Batch Size: 5
> Expected output: a-1,4, b-3, c-5, d-1, e-4,5
> Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4, c-5 etc
> But i always got correct number of packets with BATCH_SIZE = 1
>
> On 2020/02/27 20:40:16, Kenneth Knowles <ke...@apache.org> wrote:
> > Can you share some more details? What is the expected output and what
> > output are you seeing?
> >
> > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <de...@gmail.com>
> wrote:
> >
> > > Hey folks, I am using Apache beam Framework in Java with Direction
> Runner
> > > for local testing purposes. When using GroupIntoBatches with batch
> size 1
> > > it works perfectly fine i.e. the output of the transform is consistent
> and
> > > as expected. But when using with batch size > 1 the output Pcollection
> has
> > > less data than it should be.
> > >
> > > Pipeline flow:
> > > 1. A Transform for reading from pubsub
> > > 2. Transform for making a KV out of the data
> > > 3. A Fixed Window transform of 1 second
> > > 4. Applying GroupIntoBatches transform
> > > 5. And last, Logging the resulting Iterables.
> > >
> > > Weird thing is that it batch_size > 1 works great when running on
> > > DataflowRunner but not with DirectRunner. I think the issue might be
> with
> > > Timer Expiry since GroupIntoBatches uses BagState internally.
> > >
> > > Any help will be much appreciated.
> > >
> >
>

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Vasu Gupta <de...@gmail.com>.
Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
Issue Details: 
Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4, e-5
Batch Size: 5
Expected output: a-1,4, b-3, c-5, d-1, e-4,5
Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4, c-5 etc
But i always got correct number of packets with BATCH_SIZE = 1

On 2020/02/27 20:40:16, Kenneth Knowles <ke...@apache.org> wrote: 
> Can you share some more details? What is the expected output and what
> output are you seeing?
> 
> On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <de...@gmail.com> wrote:
> 
> > Hey folks, I am using Apache beam Framework in Java with Direction Runner
> > for local testing purposes. When using GroupIntoBatches with batch size 1
> > it works perfectly fine i.e. the output of the transform is consistent and
> > as expected. But when using with batch size > 1 the output Pcollection has
> > less data than it should be.
> >
> > Pipeline flow:
> > 1. A Transform for reading from pubsub
> > 2. Transform for making a KV out of the data
> > 3. A Fixed Window transform of 1 second
> > 4. Applying GroupIntoBatches transform
> > 5. And last, Logging the resulting Iterables.
> >
> > Weird thing is that it batch_size > 1 works great when running on
> > DataflowRunner but not with DirectRunner. I think the issue might be with
> > Timer Expiry since GroupIntoBatches uses BagState internally.
> >
> > Any help will be much appreciated.
> >
> 

Re: GroupIntoBatches not Working properly for Direct Runner Java

Posted by Kenneth Knowles <ke...@apache.org>.
Can you share some more details? What is the expected output and what
output are you seeing?

On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <de...@gmail.com> wrote:

> Hey folks, I am using Apache beam Framework in Java with Direction Runner
> for local testing purposes. When using GroupIntoBatches with batch size 1
> it works perfectly fine i.e. the output of the transform is consistent and
> as expected. But when using with batch size > 1 the output Pcollection has
> less data than it should be.
>
> Pipeline flow:
> 1. A Transform for reading from pubsub
> 2. Transform for making a KV out of the data
> 3. A Fixed Window transform of 1 second
> 4. Applying GroupIntoBatches transform
> 5. And last, Logging the resulting Iterables.
>
> Weird thing is that it batch_size > 1 works great when running on
> DataflowRunner but not with DirectRunner. I think the issue might be with
> Timer Expiry since GroupIntoBatches uses BagState internally.
>
> Any help will be much appreciated.
>