You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Anton Kedin <ke...@google.com> on 2019/04/10 22:59:02 UTC

Java DirectRunner BagState Problem

Hi dev@,

I am debugging a flaky test and observing an interesting problem with a
BagState. The code in question is here
<https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363>
[1]:

   - the pipeline
<https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322>
[2] looks like: pcollection of elements -> re-window into global window ->
assign the same dummy key -> stateful ParDo;

   - in the stateful ParDo I add the newly received element to the state,
then read all the elements from the state, then do stuff to all elements;

   - I expect all previous and the new elements to be in the state, as they
all are in the same (global) window and have the same key;

However I noticed that sometimes I observe that the old elements are gone
and I am only able to retrieve the new element I just added. This happens
maybe in 10% of executions, without any changes to the code. I see this
problem on my local machine in DirectRunner, and I think this is the reason
for the test flake that happens in Jenkins as well.

I am digging deeper to debug/fix this but wondering if anyone has seen
something like this? Or maybe I am missing something or using the state
incorrectly?

[1]:
https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363
[2]:
https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322

Regards,
Anton

Re: Java DirectRunner BagState Problem

Posted by Amit Ziv-Kenet <am...@gmail.com>.
I've encountered a seemingly correlated phenomenon with the Dataflow runner
(wrote about it previously to the @users list).
In my case the "disappearing" BagState elements were late elements (beyond
allowed lateness) which I assumed shouldn't make it to the the stateful
ParDo at all - but instead were appearing in the BagState and then
disappearing.

I've implemented a small-scale example pipeline which demonstrated this
behavior here <https://github.com/azk/late-samples-state>.



On Thu, Apr 11, 2019 at 3:41 AM Steve Niemitz <sn...@apache.org> wrote:

> This sounds a lot like what I had reported in
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-6813
> (sorry for the mobile link).
>
> On Wed, Apr 10, 2019 at 6:59 PM Anton Kedin <ke...@google.com> wrote:
>
>> Hi dev@,
>>
>> I am debugging a flaky test and observing an interesting problem with a
>> BagState. The code in question is here
>> <https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363>
>> [1]:
>>
>>    - the pipeline
>> <https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322>
>> [2] looks like: pcollection of elements -> re-window into global window ->
>> assign the same dummy key -> stateful ParDo;
>>
>>    - in the stateful ParDo I add the newly received element to the state,
>> then read all the elements from the state, then do stuff to all elements;
>>
>>    - I expect all previous and the new elements to be in the state, as
>> they all are in the same (global) window and have the same key;
>>
>> However I noticed that sometimes I observe that the old elements are gone
>> and I am only able to retrieve the new element I just added. This happens
>> maybe in 10% of executions, without any changes to the code. I see this
>> problem on my local machine in DirectRunner, and I think this is the reason
>> for the test flake that happens in Jenkins as well.
>>
>> I am digging deeper to debug/fix this but wondering if anyone has seen
>> something like this? Or maybe I am missing something or using the state
>> incorrectly?
>>
>> [1]:
>> https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363
>> [2]:
>> https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322
>>
>> Regards,
>> Anton
>>
>

Re: Java DirectRunner BagState Problem

Posted by Steve Niemitz <sn...@apache.org>.
This sounds a lot like what I had reported in
https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-6813
(sorry for the mobile link).

On Wed, Apr 10, 2019 at 6:59 PM Anton Kedin <ke...@google.com> wrote:

> Hi dev@,
>
> I am debugging a flaky test and observing an interesting problem with a
> BagState. The code in question is here
> <https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363>
> [1]:
>
>    - the pipeline
> <https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322>
> [2] looks like: pcollection of elements -> re-window into global window ->
> assign the same dummy key -> stateful ParDo;
>
>    - in the stateful ParDo I add the newly received element to the state,
> then read all the elements from the state, then do stuff to all elements;
>
>    - I expect all previous and the new elements to be in the state, as
> they all are in the same (global) window and have the same key;
>
> However I noticed that sometimes I observe that the old elements are gone
> and I am only able to retrieve the new element I just added. This happens
> maybe in 10% of executions, without any changes to the code. I see this
> problem on my local machine in DirectRunner, and I think this is the reason
> for the test flake that happens in Jenkins as well.
>
> I am digging deeper to debug/fix this but wondering if anyone has seen
> something like this? Or maybe I am missing something or using the state
> incorrectly?
>
> [1]:
> https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363
> [2]:
> https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322
>
> Regards,
> Anton
>