You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2018/06/04 18:57:57 UTC

Re: Multimap PCollectionViews' values udpated rather than appended

+dev@beam.apache.org
Note that this is likely a bug in the DirectRunner for accumulation mode,
filed: https://issues.apache.org/jira/browse/BEAM-4470

Discarding mode is meant to always be the latest firing, the issue though
is that you need to emit the entire map every time. If you can do this,
then it makes sense to use discarding mode. The issue with discarding mode
is that if your first trigger firing produces (A, 1), (B, 1) and your
second firing produces (B, 2), the multimap will only contain (B, 2) and
(A, 1) will have been discarded.

To my knowledge, there is no guarantee about the order in which the values
are combined. You will need to use some piece of information about the
element to figure out which is the latest (or encode some additional
information along with each element to make this easy).

On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> I've improved the example a little and added some tests
> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>
> The behaviour is slightly different, which is possibly because of the
> different runners (Dataflow/Direct) implementations, but still not working.
>
> Now what happens is that although the internal PCollection gets updated,
> the view isn't. This is happening regardless of the accumulation mode.
>
> Regarding the accumulation mode on Dataflow... That was it!! Now the sets
> contain all the items, however, one more question, is the ordering within
> the set deterministic? (i.e: Can I assume that the latest will always be on
> the last position of the Iterable object?)
>
> Also... given that for my particular case I only want the latest version,
> would you advice me to go ahead with Discarding mode?
>
> Regards
>
> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> The trigger definition in the sample code you have is using discarding
>> firing mode. Try swapping to using accumulating mode.
>>
>>
>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> But I think what I'm experiencing is quite different. Basically the side
>>> input is updated, but only one element is found on the Iterable that is the
>>> value of any key of the multimap.
>>>
>>> I mean, no concatenation seems to be happening. On the linked thread,
>>> Kenn suggests that every firing will add the new value to the set of values
>>> for the emitted key, but what I'm experiencing is that the new value is
>>> there, but just itself (i.e: is the only element in the set).
>>>
>>> @Robert, I'm using
>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>
>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> An alternative to the thread that Kenn linked (adding support for
>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>> system currently works by using a hardcoded concatenating combiner, so
>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>> set of values emitted and then turning it into a view which is why it is an
>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>
>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>> wrote:
>>>>
>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>
>>>>> It is actually long-standing and the solution is known but hard.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>> with you to clarify.
>>>>>>
>>>>>> The way I understand multimaps is that when one emits two values for
>>>>>> the same key for the same window (obvious thing here as I'm working on the
>>>>>> Global one), the newly emitted values are appended to the Iterable
>>>>>> collection that is the value for that particular key on the map.
>>>>>>
>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>> implemented with PCollectionViews):
>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>
>>>>>> The steps to reproduce are:
>>>>>> 1. Create one table on the target BQ
>>>>>> 2. Run the job
>>>>>> 3. Patch the table on BQ (add one field), this should generate a new
>>>>>> TableSchema for the corresponding TableReference
>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>> instead of appended!!
>>>>>>
>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>

Re: Multimap PCollectionViews' values udpated rather than appended

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for the snippet, updated BEAM-4470 with the additional details.

On Mon, Jun 11, 2018 at 10:56 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Many thanks for your help. Actually, my use case emits the entire map
> everytime, so I guess I'm good to go with discarding mode.
>
> This test reproduces the issue:
> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53
>
> Hope it helps
>
> On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Carlos, can you provide a test/code snippet for the bug that shows the
>> issue?
>>
>> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> +dev@beam.apache.org
>>> Note that this is likely a bug in the DirectRunner for accumulation
>>> mode, filed: https://issues.apache.org/jira/browse/BEAM-4470
>>>
>>> Discarding mode is meant to always be the latest firing, the issue
>>> though is that you need to emit the entire map every time. If you can do
>>> this, then it makes sense to use discarding mode. The issue with discarding
>>> mode is that if your first trigger firing produces (A, 1), (B, 1) and your
>>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>>> (A, 1) will have been discarded.
>>>
>>> To my knowledge, there is no guarantee about the order in which the
>>> values are combined. You will need to use some piece of information about
>>> the element to figure out which is the latest (or encode some additional
>>> information along with each element to make this easy).
>>>
>>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> I've improved the example a little and added some tests
>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>>
>>>> The behaviour is slightly different, which is possibly because of the
>>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>>
>>>> Now what happens is that although the internal PCollection gets
>>>> updated, the view isn't. This is happening regardless of the accumulation
>>>> mode.
>>>>
>>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>>> sets contain all the items, however, one more question, is the ordering
>>>> within the set deterministic? (i.e: Can I assume that the latest will
>>>> always be on the last position of the Iterable object?)
>>>>
>>>> Also... given that for my particular case I only want the latest
>>>> version, would you advice me to go ahead with Discarding mode?
>>>>
>>>> Regards
>>>>
>>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> The trigger definition in the sample code you have is using discarding
>>>>> firing mode. Try swapping to using accumulating mode.
>>>>>
>>>>>
>>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>>> side input is updated, but only one element is found on the Iterable that
>>>>>> is the value of any key of the multimap.
>>>>>>
>>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>>> Kenn suggests that every firing will add the new value to the set of values
>>>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>>
>>>>>> @Robert, I'm using
>>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>>
>>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>>>>> set of values emitted and then turning it into a view which is why it is an
>>>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>>>
>>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>>>
>>>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone!!
>>>>>>>>>
>>>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>>>>> with you to clarify.
>>>>>>>>>
>>>>>>>>> The way I understand multimaps is that when one emits two values
>>>>>>>>> for the same key for the same window (obvious thing here as I'm working on
>>>>>>>>> the Global one), the newly emitted values are appended to the Iterable
>>>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>>>
>>>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>>>> implemented with PCollectionViews):
>>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>>>
>>>>>>>>> The steps to reproduce are:
>>>>>>>>> 1. Create one table on the target BQ
>>>>>>>>> 2. Run the job
>>>>>>>>> 3. Patch the table on BQ (add one field), this should generate a
>>>>>>>>> new TableSchema for the corresponding TableReference
>>>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>>>>> instead of appended!!
>>>>>>>>>
>>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>

Re: Multimap PCollectionViews' values udpated rather than appended

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for the snippet, updated BEAM-4470 with the additional details.

On Mon, Jun 11, 2018 at 10:56 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Many thanks for your help. Actually, my use case emits the entire map
> everytime, so I guess I'm good to go with discarding mode.
>
> This test reproduces the issue:
> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53
>
> Hope it helps
>
> On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Carlos, can you provide a test/code snippet for the bug that shows the
>> issue?
>>
>> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> +dev@beam.apache.org
>>> Note that this is likely a bug in the DirectRunner for accumulation
>>> mode, filed: https://issues.apache.org/jira/browse/BEAM-4470
>>>
>>> Discarding mode is meant to always be the latest firing, the issue
>>> though is that you need to emit the entire map every time. If you can do
>>> this, then it makes sense to use discarding mode. The issue with discarding
>>> mode is that if your first trigger firing produces (A, 1), (B, 1) and your
>>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>>> (A, 1) will have been discarded.
>>>
>>> To my knowledge, there is no guarantee about the order in which the
>>> values are combined. You will need to use some piece of information about
>>> the element to figure out which is the latest (or encode some additional
>>> information along with each element to make this easy).
>>>
>>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> I've improved the example a little and added some tests
>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>>
>>>> The behaviour is slightly different, which is possibly because of the
>>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>>
>>>> Now what happens is that although the internal PCollection gets
>>>> updated, the view isn't. This is happening regardless of the accumulation
>>>> mode.
>>>>
>>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>>> sets contain all the items, however, one more question, is the ordering
>>>> within the set deterministic? (i.e: Can I assume that the latest will
>>>> always be on the last position of the Iterable object?)
>>>>
>>>> Also... given that for my particular case I only want the latest
>>>> version, would you advice me to go ahead with Discarding mode?
>>>>
>>>> Regards
>>>>
>>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> The trigger definition in the sample code you have is using discarding
>>>>> firing mode. Try swapping to using accumulating mode.
>>>>>
>>>>>
>>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>>> side input is updated, but only one element is found on the Iterable that
>>>>>> is the value of any key of the multimap.
>>>>>>
>>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>>> Kenn suggests that every firing will add the new value to the set of values
>>>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>>
>>>>>> @Robert, I'm using
>>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>>
>>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>>>>> set of values emitted and then turning it into a view which is why it is an
>>>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>>>
>>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>>>
>>>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone!!
>>>>>>>>>
>>>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>>>>> with you to clarify.
>>>>>>>>>
>>>>>>>>> The way I understand multimaps is that when one emits two values
>>>>>>>>> for the same key for the same window (obvious thing here as I'm working on
>>>>>>>>> the Global one), the newly emitted values are appended to the Iterable
>>>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>>>
>>>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>>>> implemented with PCollectionViews):
>>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>>>
>>>>>>>>> The steps to reproduce are:
>>>>>>>>> 1. Create one table on the target BQ
>>>>>>>>> 2. Run the job
>>>>>>>>> 3. Patch the table on BQ (add one field), this should generate a
>>>>>>>>> new TableSchema for the corresponding TableReference
>>>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>>>>> instead of appended!!
>>>>>>>>>
>>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>

Re: Multimap PCollectionViews' values udpated rather than appended

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Many thanks for your help. Actually, my use case emits the entire map
everytime, so I guess I'm good to go with discarding mode.

This test reproduces the issue:
https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53

Hope it helps

On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lc...@google.com> wrote:

> Carlos, can you provide a test/code snippet for the bug that shows the
> issue?
>
> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> +dev@beam.apache.org
>> Note that this is likely a bug in the DirectRunner for accumulation mode,
>> filed: https://issues.apache.org/jira/browse/BEAM-4470
>>
>> Discarding mode is meant to always be the latest firing, the issue though
>> is that you need to emit the entire map every time. If you can do this,
>> then it makes sense to use discarding mode. The issue with discarding mode
>> is that if your first trigger firing produces (A, 1), (B, 1) and your
>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>> (A, 1) will have been discarded.
>>
>> To my knowledge, there is no guarantee about the order in which the
>> values are combined. You will need to use some piece of information about
>> the element to figure out which is the latest (or encode some additional
>> information along with each element to make this easy).
>>
>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> I've improved the example a little and added some tests
>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>
>>> The behaviour is slightly different, which is possibly because of the
>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>
>>> Now what happens is that although the internal PCollection gets updated,
>>> the view isn't. This is happening regardless of the accumulation mode.
>>>
>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>> sets contain all the items, however, one more question, is the ordering
>>> within the set deterministic? (i.e: Can I assume that the latest will
>>> always be on the last position of the Iterable object?)
>>>
>>> Also... given that for my particular case I only want the latest
>>> version, would you advice me to go ahead with Discarding mode?
>>>
>>> Regards
>>>
>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The trigger definition in the sample code you have is using discarding
>>>> firing mode. Try swapping to using accumulating mode.
>>>>
>>>>
>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>> side input is updated, but only one element is found on the Iterable that
>>>>> is the value of any key of the multimap.
>>>>>
>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>> Kenn suggests that every firing will add the new value to the set of values
>>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>
>>>>> @Robert, I'm using
>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>
>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>>>> set of values emitted and then turning it into a view which is why it is an
>>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>>
>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>>>> with you to clarify.
>>>>>>>>
>>>>>>>> The way I understand multimaps is that when one emits two values
>>>>>>>> for the same key for the same window (obvious thing here as I'm working on
>>>>>>>> the Global one), the newly emitted values are appended to the Iterable
>>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>>
>>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>>> implemented with PCollectionViews):
>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>>
>>>>>>>> The steps to reproduce are:
>>>>>>>> 1. Create one table on the target BQ
>>>>>>>> 2. Run the job
>>>>>>>> 3. Patch the table on BQ (add one field), this should generate a
>>>>>>>> new TableSchema for the corresponding TableReference
>>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>>>> instead of appended!!
>>>>>>>>
>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>

Re: Multimap PCollectionViews' values udpated rather than appended

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Many thanks for your help. Actually, my use case emits the entire map
everytime, so I guess I'm good to go with discarding mode.

This test reproduces the issue:
https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53

Hope it helps

On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lc...@google.com> wrote:

> Carlos, can you provide a test/code snippet for the bug that shows the
> issue?
>
> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> +dev@beam.apache.org
>> Note that this is likely a bug in the DirectRunner for accumulation mode,
>> filed: https://issues.apache.org/jira/browse/BEAM-4470
>>
>> Discarding mode is meant to always be the latest firing, the issue though
>> is that you need to emit the entire map every time. If you can do this,
>> then it makes sense to use discarding mode. The issue with discarding mode
>> is that if your first trigger firing produces (A, 1), (B, 1) and your
>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>> (A, 1) will have been discarded.
>>
>> To my knowledge, there is no guarantee about the order in which the
>> values are combined. You will need to use some piece of information about
>> the element to figure out which is the latest (or encode some additional
>> information along with each element to make this easy).
>>
>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> I've improved the example a little and added some tests
>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>
>>> The behaviour is slightly different, which is possibly because of the
>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>
>>> Now what happens is that although the internal PCollection gets updated,
>>> the view isn't. This is happening regardless of the accumulation mode.
>>>
>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>> sets contain all the items, however, one more question, is the ordering
>>> within the set deterministic? (i.e: Can I assume that the latest will
>>> always be on the last position of the Iterable object?)
>>>
>>> Also... given that for my particular case I only want the latest
>>> version, would you advice me to go ahead with Discarding mode?
>>>
>>> Regards
>>>
>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The trigger definition in the sample code you have is using discarding
>>>> firing mode. Try swapping to using accumulating mode.
>>>>
>>>>
>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>> side input is updated, but only one element is found on the Iterable that
>>>>> is the value of any key of the multimap.
>>>>>
>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>> Kenn suggests that every firing will add the new value to the set of values
>>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>
>>>>> @Robert, I'm using
>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>
>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>>>> set of values emitted and then turning it into a view which is why it is an
>>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>>
>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>>>> with you to clarify.
>>>>>>>>
>>>>>>>> The way I understand multimaps is that when one emits two values
>>>>>>>> for the same key for the same window (obvious thing here as I'm working on
>>>>>>>> the Global one), the newly emitted values are appended to the Iterable
>>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>>
>>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>>> implemented with PCollectionViews):
>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>>
>>>>>>>> The steps to reproduce are:
>>>>>>>> 1. Create one table on the target BQ
>>>>>>>> 2. Run the job
>>>>>>>> 3. Patch the table on BQ (add one field), this should generate a
>>>>>>>> new TableSchema for the corresponding TableReference
>>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>>>> instead of appended!!
>>>>>>>>
>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>

Re: Multimap PCollectionViews' values udpated rather than appended

Posted by Lukasz Cwik <lc...@google.com>.
Carlos, can you provide a test/code snippet for the bug that shows the
issue?

On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:

> +dev@beam.apache.org
> Note that this is likely a bug in the DirectRunner for accumulation mode,
> filed: https://issues.apache.org/jira/browse/BEAM-4470
>
> Discarding mode is meant to always be the latest firing, the issue though
> is that you need to emit the entire map every time. If you can do this,
> then it makes sense to use discarding mode. The issue with discarding mode
> is that if your first trigger firing produces (A, 1), (B, 1) and your
> second firing produces (B, 2), the multimap will only contain (B, 2) and
> (A, 1) will have been discarded.
>
> To my knowledge, there is no guarantee about the order in which the values
> are combined. You will need to use some piece of information about the
> element to figure out which is the latest (or encode some additional
> information along with each element to make this easy).
>
> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> I've improved the example a little and added some tests
>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>
>> The behaviour is slightly different, which is possibly because of the
>> different runners (Dataflow/Direct) implementations, but still not working.
>>
>> Now what happens is that although the internal PCollection gets updated,
>> the view isn't. This is happening regardless of the accumulation mode.
>>
>> Regarding the accumulation mode on Dataflow... That was it!! Now the sets
>> contain all the items, however, one more question, is the ordering within
>> the set deterministic? (i.e: Can I assume that the latest will always be on
>> the last position of the Iterable object?)
>>
>> Also... given that for my particular case I only want the latest version,
>> would you advice me to go ahead with Discarding mode?
>>
>> Regards
>>
>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> The trigger definition in the sample code you have is using discarding
>>> firing mode. Try swapping to using accumulating mode.
>>>
>>>
>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> But I think what I'm experiencing is quite different. Basically the
>>>> side input is updated, but only one element is found on the Iterable that
>>>> is the value of any key of the multimap.
>>>>
>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>> Kenn suggests that every firing will add the new value to the set of values
>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>> there, but just itself (i.e: is the only element in the set).
>>>>
>>>> @Robert, I'm using
>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>
>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>>> set of values emitted and then turning it into a view which is why it is an
>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>
>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>
>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone!!
>>>>>>>
>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>>> with you to clarify.
>>>>>>>
>>>>>>> The way I understand multimaps is that when one emits two values for
>>>>>>> the same key for the same window (obvious thing here as I'm working on the
>>>>>>> Global one), the newly emitted values are appended to the Iterable
>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>
>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>> implemented with PCollectionViews):
>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>
>>>>>>> The steps to reproduce are:
>>>>>>> 1. Create one table on the target BQ
>>>>>>> 2. Run the job
>>>>>>> 3. Patch the table on BQ (add one field), this should generate a new
>>>>>>> TableSchema for the corresponding TableReference
>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>>> instead of appended!!
>>>>>>>
>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>

Re: Multimap PCollectionViews' values udpated rather than appended

Posted by Lukasz Cwik <lc...@google.com>.
Carlos, can you provide a test/code snippet for the bug that shows the
issue?

On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lc...@google.com> wrote:

> +dev@beam.apache.org
> Note that this is likely a bug in the DirectRunner for accumulation mode,
> filed: https://issues.apache.org/jira/browse/BEAM-4470
>
> Discarding mode is meant to always be the latest firing, the issue though
> is that you need to emit the entire map every time. If you can do this,
> then it makes sense to use discarding mode. The issue with discarding mode
> is that if your first trigger firing produces (A, 1), (B, 1) and your
> second firing produces (B, 2), the multimap will only contain (B, 2) and
> (A, 1) will have been discarded.
>
> To my knowledge, there is no guarantee about the order in which the values
> are combined. You will need to use some piece of information about the
> element to figure out which is the latest (or encode some additional
> information along with each element to make this easy).
>
> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> I've improved the example a little and added some tests
>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>
>> The behaviour is slightly different, which is possibly because of the
>> different runners (Dataflow/Direct) implementations, but still not working.
>>
>> Now what happens is that although the internal PCollection gets updated,
>> the view isn't. This is happening regardless of the accumulation mode.
>>
>> Regarding the accumulation mode on Dataflow... That was it!! Now the sets
>> contain all the items, however, one more question, is the ordering within
>> the set deterministic? (i.e: Can I assume that the latest will always be on
>> the last position of the Iterable object?)
>>
>> Also... given that for my particular case I only want the latest version,
>> would you advice me to go ahead with Discarding mode?
>>
>> Regards
>>
>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> The trigger definition in the sample code you have is using discarding
>>> firing mode. Try swapping to using accumulating mode.
>>>
>>>
>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> But I think what I'm experiencing is quite different. Basically the
>>>> side input is updated, but only one element is found on the Iterable that
>>>> is the value of any key of the multimap.
>>>>
>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>> Kenn suggests that every firing will add the new value to the set of values
>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>> there, but just itself (i.e: is the only element in the set).
>>>>
>>>> @Robert, I'm using
>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>
>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>> retractions) is to add explicit support for combiners into side inputs. The
>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>>>> set of values emitted and then turning it into a view which is why it is an
>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>
>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <kl...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>
>>>>>> It is actually long-standing and the solution is known but hard.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone!!
>>>>>>>
>>>>>>> Working with multimap based side inputs on the global window I'm
>>>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>>>> with you to clarify.
>>>>>>>
>>>>>>> The way I understand multimaps is that when one emits two values for
>>>>>>> the same key for the same window (obvious thing here as I'm working on the
>>>>>>> Global one), the newly emitted values are appended to the Iterable
>>>>>>> collection that is the value for that particular key on the map.
>>>>>>>
>>>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>>>> implemented with PCollectionViews):
>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>
>>>>>>> The steps to reproduce are:
>>>>>>> 1. Create one table on the target BQ
>>>>>>> 2. Run the job
>>>>>>> 3. Patch the table on BQ (add one field), this should generate a new
>>>>>>> TableSchema for the corresponding TableReference
>>>>>>> 4. An updated value of the fields number appear on the logs, but
>>>>>>> there is only one element within the iterable, as if it had been updated
>>>>>>> instead of appended!!
>>>>>>>
>>>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>