You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Binh Nguyen Van <bi...@gmail.com> on 2023/03/02 18:17:50 UTC

Deduplicate usage

Hi,

I am writing a pipeline and want to apply deduplication. I look at
Deduplicate transform that Beam provides and wonder about its usage. Do I
need to shuffle input collection by key before calling this transformation?
I look at its source code and it doesn’t do any shuffle so wonder how it
works when let’s say there are duplicates and the duplicated elements are
processed concurrently on multiple workers.

Thank you
-Binh

Re: Deduplicate usage

Posted by Binh Nguyen Van <bi...@gmail.com>.
That explains it. Thank you Robert and all!

-Binh


On Thu, Mar 2, 2023 at 4:51 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> Whenever state is used, the runner will arrange such that the same
> keys will all go to the same worker, which often involves injecting a
> shuffle-like operation if the keys are spread out among many workers
> in the input. (An alternative implementation could involve storing the
> state in a distributed transactional store with the appropriate
> locks.) There is no need for you to do anything before calling the
> Deduplicate transform.
>
> On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van <bi...@gmail.com> wrote:
> >
> > Thanks Reuven,
> >
> > I got the idea of the state is per key and keys are distributed across
> workers but I am trying to understand where/how the distribution part is
> implemented so that elements with the same keys will go to the same worker.
> Do I need to do this before calling `Deduplicate` transform? If not then
> where is it being implemented?
> >
> > Thanks
> > -Binh
> >
> >
> > On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <
> user@beam.apache.org> wrote:
> >>
> >> State is per-key, and keys are distributed across workers. Two workers
> should not be working on the same state.
> >>
> >> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <bi...@gmail.com>
> wrote:
> >>>
> >>> Thank you Ankur,
> >>>
> >>> This is the current source code of Deduplicate transform.
> >>>
> >>>       Boolean seen = seenState.read();
> >>>       // Seen state is either set or not set so if it has been set
> then it must be true.
> >>>       if (seen == null) {
> >>>         // We don't want the expiry timer to hold up watermarks.
> >>>
>  expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
> >>>         seenState.write(true);
> >>>         receiver.output(element);
> >>>       }
> >>>
> >>> Could you please explain the synchronization for the following
> scenario?
> >>>
> >>> There are two workers.
> >>> Both workers read the same state at the same time and the state was
> not set yet. In this case, both will get null in the response (I believe)
> >>> Both of them will try to set the state and send the output out.
> >>>
> >>> What will happen in this scenario?
> >>>
> >>> Thank you
> >>> -Binh
> >>>
> >>>
> >>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <an...@gmail.com>
> wrote:
> >>>>
> >>>> Hi Binh, The Deduplicate transform uses state api to do the
> de-duplication which should do the needful operations to work across
> multiple concurrent workers.
> >>>>
> >>>> Thanks,
> >>>> Ankur
> >>>>
> >>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <bi...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I am writing a pipeline and want to apply deduplication. I look at
> Deduplicate transform that Beam provides and wonder about its usage. Do I
> need to shuffle input collection by key before calling this transformation?
> I look at its source code and it doesn’t do any shuffle so wonder how it
> works when let’s say there are duplicates and the duplicated elements are
> processed concurrently on multiple workers.
> >>>>>
> >>>>> Thank you
> >>>>> -Binh
>

Re: Deduplicate usage

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
Whenever state is used, the runner will arrange such that the same
keys will all go to the same worker, which often involves injecting a
shuffle-like operation if the keys are spread out among many workers
in the input. (An alternative implementation could involve storing the
state in a distributed transactional store with the appropriate
locks.) There is no need for you to do anything before calling the
Deduplicate transform.

On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van <bi...@gmail.com> wrote:
>
> Thanks Reuven,
>
> I got the idea of the state is per key and keys are distributed across workers but I am trying to understand where/how the distribution part is implemented so that elements with the same keys will go to the same worker. Do I need to do this before calling `Deduplicate` transform? If not then where is it being implemented?
>
> Thanks
> -Binh
>
>
> On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <us...@beam.apache.org> wrote:
>>
>> State is per-key, and keys are distributed across workers. Two workers should not be working on the same state.
>>
>> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <bi...@gmail.com> wrote:
>>>
>>> Thank you Ankur,
>>>
>>> This is the current source code of Deduplicate transform.
>>>
>>>       Boolean seen = seenState.read();
>>>       // Seen state is either set or not set so if it has been set then it must be true.
>>>       if (seen == null) {
>>>         // We don't want the expiry timer to hold up watermarks.
>>>         expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>>         seenState.write(true);
>>>         receiver.output(element);
>>>       }
>>>
>>> Could you please explain the synchronization for the following scenario?
>>>
>>> There are two workers.
>>> Both workers read the same state at the same time and the state was not set yet. In this case, both will get null in the response (I believe)
>>> Both of them will try to set the state and send the output out.
>>>
>>> What will happen in this scenario?
>>>
>>> Thank you
>>> -Binh
>>>
>>>
>>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <an...@gmail.com> wrote:
>>>>
>>>> Hi Binh, The Deduplicate transform uses state api to do the de-duplication which should do the needful operations to work across multiple concurrent workers.
>>>>
>>>> Thanks,
>>>> Ankur
>>>>
>>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <bi...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a pipeline and want to apply deduplication. I look at Deduplicate transform that Beam provides and wonder about its usage. Do I need to shuffle input collection by key before calling this transformation? I look at its source code and it doesn’t do any shuffle so wonder how it works when let’s say there are duplicates and the duplicated elements are processed concurrently on multiple workers.
>>>>>
>>>>> Thank you
>>>>> -Binh

Re: Deduplicate usage

Posted by Binh Nguyen Van <bi...@gmail.com>.
Thanks Reuven,

I got the idea of the state is per key and keys are distributed across
workers but I am trying to understand where/how the distribution part is
implemented so that elements with the same keys will go to the same worker.
Do I need to do this before calling `Deduplicate` transform? If not then
where is it being implemented?

Thanks
-Binh


On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user <us...@beam.apache.org>
wrote:

> State is per-key, and keys are distributed across workers. Two workers
> should not be working on the same state.
>
> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <bi...@gmail.com>
> wrote:
>
>> Thank you Ankur,
>>
>> This is the current source code of Deduplicate transform.
>>
>>       Boolean seen = seenState.read();
>>       // Seen state is either set or not set so if it has been set then it must be true.
>>       if (seen == null) {
>>         // We don't want the expiry timer to hold up watermarks.
>>         expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>         seenState.write(true);
>>         receiver.output(element);
>>       }
>>
>> Could you please explain the synchronization for the following scenario?
>>
>>    - There are two workers.
>>    - Both workers read the same state at the same time and the state was
>>    not set yet. In this case, both will get null in the response (I
>>    believe)
>>    - Both of them will try to set the state and send the output out.
>>
>> What will happen in this scenario?
>>
>> Thank you
>> -Binh
>>
>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <an...@gmail.com>
>> wrote:
>>
>>> Hi Binh, The Deduplicate transform uses state api to do the
>>> de-duplication which should do the needful operations to work across
>>> multiple concurrent workers.
>>>
>>> Thanks,
>>> Ankur
>>>
>>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <bi...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am writing a pipeline and want to apply deduplication. I look at
>>>> Deduplicate transform that Beam provides and wonder about its usage.
>>>> Do I need to shuffle input collection by key before calling this
>>>> transformation? I look at its source code and it doesn’t do any shuffle so
>>>> wonder how it works when let’s say there are duplicates and the duplicated
>>>> elements are processed concurrently on multiple workers.
>>>>
>>>> Thank you
>>>> -Binh
>>>>
>>>

Re: Deduplicate usage

Posted by Reuven Lax via user <us...@beam.apache.org>.
State is per-key, and keys are distributed across workers. Two workers
should not be working on the same state.

On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <bi...@gmail.com> wrote:

> Thank you Ankur,
>
> This is the current source code of Deduplicate transform.
>
>       Boolean seen = seenState.read();
>       // Seen state is either set or not set so if it has been set then it must be true.
>       if (seen == null) {
>         // We don't want the expiry timer to hold up watermarks.
>         expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>         seenState.write(true);
>         receiver.output(element);
>       }
>
> Could you please explain the synchronization for the following scenario?
>
>    - There are two workers.
>    - Both workers read the same state at the same time and the state was
>    not set yet. In this case, both will get null in the response (I
>    believe)
>    - Both of them will try to set the state and send the output out.
>
> What will happen in this scenario?
>
> Thank you
> -Binh
>
> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <an...@gmail.com>
> wrote:
>
>> Hi Binh, The Deduplicate transform uses state api to do the
>> de-duplication which should do the needful operations to work across
>> multiple concurrent workers.
>>
>> Thanks,
>> Ankur
>>
>> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <bi...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am writing a pipeline and want to apply deduplication. I look at
>>> Deduplicate transform that Beam provides and wonder about its usage. Do
>>> I need to shuffle input collection by key before calling this
>>> transformation? I look at its source code and it doesn’t do any shuffle so
>>> wonder how it works when let’s say there are duplicates and the duplicated
>>> elements are processed concurrently on multiple workers.
>>>
>>> Thank you
>>> -Binh
>>>
>>

Re: Deduplicate usage

Posted by Binh Nguyen Van <bi...@gmail.com>.
Thank you Ankur,

This is the current source code of Deduplicate transform.

      Boolean seen = seenState.read();
      // Seen state is either set or not set so if it has been set
then it must be true.
      if (seen == null) {
        // We don't want the expiry timer to hold up watermarks.
        expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
        seenState.write(true);
        receiver.output(element);
      }

Could you please explain the synchronization for the following scenario?

   - There are two workers.
   - Both workers read the same state at the same time and the state was
   not set yet. In this case, both will get null in the response (I believe)
   - Both of them will try to set the state and send the output out.

What will happen in this scenario?

Thank you
-Binh

On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <an...@gmail.com> wrote:

> Hi Binh, The Deduplicate transform uses state api to do the
> de-duplication which should do the needful operations to work across
> multiple concurrent workers.
>
> Thanks,
> Ankur
>
> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <bi...@gmail.com> wrote:
>
>> Hi,
>>
>> I am writing a pipeline and want to apply deduplication. I look at
>> Deduplicate transform that Beam provides and wonder about its usage. Do
>> I need to shuffle input collection by key before calling this
>> transformation? I look at its source code and it doesn’t do any shuffle so
>> wonder how it works when let’s say there are duplicates and the duplicated
>> elements are processed concurrently on multiple workers.
>>
>> Thank you
>> -Binh
>>
>

Re: Deduplicate usage

Posted by Ankur Goenka <an...@gmail.com>.
Hi Binh, The Deduplicate transform uses state api to do the de-duplication
which should do the needful operations to work across multiple concurrent
workers.

Thanks,
Ankur

On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <bi...@gmail.com> wrote:

> Hi,
>
> I am writing a pipeline and want to apply deduplication. I look at
> Deduplicate transform that Beam provides and wonder about its usage. Do I
> need to shuffle input collection by key before calling this transformation?
> I look at its source code and it doesn’t do any shuffle so wonder how it
> works when let’s say there are duplicates and the duplicated elements are
> processed concurrently on multiple workers.
>
> Thank you
> -Binh
>