You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Elizaveta Lomteva <el...@akvelon.com> on 2022/08/03 15:56:13 UTC

[Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset

Hi community!

Our team is working on the SparkReceiverIO connector for Apache Beam. We have published SparkReceiverIO.Read PR [1].


  *   We are working with the Spark Receiver class [2]. Receiver should ideally implement HasOffset interface [3] with the setStartOffset() method, so we can start multiple Receivers from some offset for the specific Restriction.

  *   Also SplittableDoFn implies presence of the RestrictionTracker which has .tryClaim(offset) method. So there should be an ability to get offset for the current record from the Receiver.


Let’s imagine we are dealing with a simple Receiver, that doesn’t implement HasOffset interface [3], and we are thinking about using the SDF approach for this case as well. There are some unresolved questions:

  1.  Since we don’t have the ability to start multiple Receivers from different offsets, we need to start only one main Receiver [0 ; +inf). What is the best place for doing this? (Currently, it’s the constructor of the SDF).

  2.  All records coming from the Receiver should be stored in some buffer. Since SDF objects are serialized, what is the best way to provide a link to the shared buffer for all of them?

  3.  How to correctly stop our main Receiver? (also serialization problem)

  4.  There are no tangible benefits from using SDF - we can’t parallelize reading, because there will be a single-thread Receiver limitation.

  5.  What if we are dealing with the Receiver that doesn’t have the ability to determine offset for the current record?


A possible solution that we see is to use the UnboundedSource approach, as we did earlier in Read from Spark Receiver via the UnboundedSource PoC branch [4]. It looks like we can resolve all the questions above by implementing it. But the UnboundedSource is deprecated.


Could someone give us advice on how can we manage working with Receivers without offset in our case?

Any ideas or comments would be greatly appreciated.


Thanks for your attention to it!

Elizaveta


[1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – https://github.com/apache/beam/pull/17828

[2] Spark Streaming Custom Receivers – https://spark.apache.org/docs/latest/streaming-custom-receivers.html

[3] HasOffset interface – https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java

[4] SparkReceiverIO: Read via UnboundedSource – https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0



Re: [Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset

Posted by Chamikara Jayalath via dev <de...@beam.apache.org>.
On Wed, Aug 3, 2022 at 10:03 AM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva <
> elizaveta.lomteva@akvelon.com> wrote:
>
>> Hi community!
>>
>> Our team is working on the SparkReceiverIO connector for Apache Beam. We
>> have published SparkReceiverIO.Read PR [1].
>>
>>
>>    -
>>
>>    We are working with the Spark Receiver class [2]. Receiver should
>>    ideally implement HasOffset interface [3] with the setStartOffset()
>>    method, so we can start multiple Receivers from some offset for the
>>    specific Restriction.
>>    -
>>
>>    Also SplittableDoFn implies presence of the RestrictionTracker which
>>    has .tryClaim(offset) method. So there should be an ability to get
>>    offset for the current record from the Receiver.
>>
>>
>> Let’s imagine we are dealing with a simple Receiver, that doesn’t
>> implement HasOffset interface [3], and we are thinking about using the
>> SDF approach for this case as well. There are some unresolved questions:
>>
>>    1.
>>
>>    Since we don’t have the ability to start multiple Receivers from
>>    different offsets, we need to start only one main Receiver [0 ;
>>    +inf). What is the best place for doing this? (Currently, it’s the
>>    constructor of the SDF).
>>
>>
> When you provide the initial restriction, you can simply provide a tracker
> (OffsetRangeTracker) that only contains one offset, so the range will be
> [0, 1].
>

Sorry, should be [0, 1).
Also, probably makes sense to develop an UnsplittableRangeTracker that can
also be used in other contexts. We have something similar for Python with
the old RangeTracker interface.
https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/range_trackers.py#L301



> Within the splitRestriction() method, return the same restriction.
>
> This will result in a SDF that will not be split either through initial
> splitting or dynamic work rebalancing.
>
> This will not be very efficient, but hopefully this will generalize your
> solution for all receivers, whether they support offsets or not.
>
>
>>
>>    1.
>>    2.
>>
>>    All records coming from the Receiver should be stored in some buffer.
>>    Since SDF objects are serialized, what is the best way to provide a
>>    link to the shared buffer for all of them?
>>
>>
>>    1.
>>    2.
>>
>>    How to correctly stop our main Receiver? (also serialization problem)
>>    3.
>>
>>    There are no tangible benefits from using SDF - we can’t parallelize
>>    reading, because there will be a single-thread Receiver limitation.
>>    4.
>>
>>    What if we are dealing with the Receiver that doesn’t have the
>>    ability to determine offset for the current record?
>>
>>
> Could you clarify why these questions could be addressed for
> UnboundedSource but not SDF. AFAICT these should be addressed with either
> source framework (probably in a similar way). For example, both types of
> source objects get serialized by runners.
>
> Thanks,
> Cham
>
>
>>
>>    1.
>>
>>
>> A possible solution that we see is to use the UnboundedSource approach,
>> as we did earlier in Read from Spark Receiver via the UnboundedSource PoC
>> branch [4]. It looks like we can resolve all the questions above by
>> implementing it. But the UnboundedSource is deprecated.
>>
>> Could someone give us advice on how can we manage working with Receivers
>> without offset in our case?
>>
>> Any ideas or comments would be greatly appreciated.
>>
>> Thanks for your attention to it!
>>
>> Elizaveta
>>
>> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 –
>> https://github.com/apache/beam/pull/17828
>>
>> [2] Spark Streaming Custom Receivers –
>> https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>
>> [3] HasOffset interface –
>> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
>>
>> [4] SparkReceiverIO: Read via UnboundedSource –
>> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0
>>
>>
>>

Re: [Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset

Posted by Chamikara Jayalath via dev <de...@beam.apache.org>.
On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva <
elizaveta.lomteva@akvelon.com> wrote:

> Hi community!
>
> Our team is working on the SparkReceiverIO connector for Apache Beam. We
> have published SparkReceiverIO.Read PR [1].
>
>
>    -
>
>    We are working with the Spark Receiver class [2]. Receiver should
>    ideally implement HasOffset interface [3] with the setStartOffset()
>    method, so we can start multiple Receivers from some offset for the
>    specific Restriction.
>    -
>
>    Also SplittableDoFn implies presence of the RestrictionTracker which
>    has .tryClaim(offset) method. So there should be an ability to get
>    offset for the current record from the Receiver.
>
>
> Let’s imagine we are dealing with a simple Receiver, that doesn’t
> implement HasOffset interface [3], and we are thinking about using the SDF
> approach for this case as well. There are some unresolved questions:
>
>    1.
>
>    Since we don’t have the ability to start multiple Receivers from
>    different offsets, we need to start only one main Receiver [0 ; +inf).
>    What is the best place for doing this? (Currently, it’s the constructor of
>    the SDF).
>
>
When you provide the initial restriction, you can simply provide a tracker
(OffsetRangeTracker) that only contains one offset, so the range will be
[0, 1].
Within the splitRestriction() method, return the same restriction.

This will result in a SDF that will not be split either through initial
splitting or dynamic work rebalancing.

This will not be very efficient, but hopefully this will generalize your
solution for all receivers, whether they support offsets or not.


>
>    1.
>    2.
>
>    All records coming from the Receiver should be stored in some buffer.
>    Since SDF objects are serialized, what is the best way to provide a
>    link to the shared buffer for all of them?
>
>
>    1.
>    2.
>
>    How to correctly stop our main Receiver? (also serialization problem)
>    3.
>
>    There are no tangible benefits from using SDF - we can’t parallelize
>    reading, because there will be a single-thread Receiver limitation.
>    4.
>
>    What if we are dealing with the Receiver that doesn’t have the ability
>    to determine offset for the current record?
>
>
Could you clarify why these questions could be addressed for
UnboundedSource but not SDF. AFAICT these should be addressed with either
source framework (probably in a similar way). For example, both types of
source objects get serialized by runners.

Thanks,
Cham


>
>    1.
>
>
> A possible solution that we see is to use the UnboundedSource approach, as
> we did earlier in Read from Spark Receiver via the UnboundedSource PoC
> branch [4]. It looks like we can resolve all the questions above by
> implementing it. But the UnboundedSource is deprecated.
>
> Could someone give us advice on how can we manage working with Receivers
> without offset in our case?
>
> Any ideas or comments would be greatly appreciated.
>
> Thanks for your attention to it!
>
> Elizaveta
>
> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 –
> https://github.com/apache/beam/pull/17828
>
> [2] Spark Streaming Custom Receivers –
> https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>
> [3] HasOffset interface –
> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
>
> [4] SparkReceiverIO: Read via UnboundedSource –
> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0
>
>
>