You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Elizaveta Lomteva <el...@akvelon.com> on 2022/07/22 16:13:55 UTC

[CDAP IO] Read from Spark Receivers via SDF interface for SparkReceiverIO

Hi everyone!

Our team is working on the SparkReceiver IO as a part of the CDAP IO Connector for Apache Beam project. The goal of SparkRecieverIO is to provide an interface for reading data from any Custom Spark Receivers [1]. We were implementing Read via SDF interface for the SparkRecieverIO in PR #17828 [2] and ran into limitations.


Issue: By default, Custom Spark Receivers aren’t meant to work with offsets. In terms of Splittable DoFn, we have no way to use these receivers in IO.


How do we bypass the issue: We’ve created a HasOffset interface [3] which defines two methods that a Custom Spark Receiver must implement to support offset handing (in addition, the receiver must also enable offsets in the onStart() method inherited from the abstract Receiver class). We’ve implemented ReadFromSparkReceiverWithOffsetDoFn to cover the case of Custom Spark Receivers that support offsets. The receiver’s build/start/stop is called in the @ProcessElement method.


For receivers that don’t support offsets, we start a receiver once in a ReadFromSparkReceiverWithoutOffsetDoFn constructor and process the data in the @ProcessElement method. To ensure that all data is written to the same queue when received, we made the queue static. But in this case, we are missing out on the SDF approach benefits and then there is no reason to use SDF.


A possible solution that we see is to use the UnboundedSource approach, as we did earlier in a Read from Spark Receiver via the UnboundedSource PoC branch [4]


Questions:

  1.  Is it possible to use a static field with SDF?

  2.  Is there a way to work with a source that doesn’t support offsets using SDF in Apache Beam?

  3.  Could someone please give us advice on what we can do with receivers that don’t support offsets not using the deprecated UnboundedSource approach?


We would appreciate it very much if you could review the SparkReceiverIO Read via SDF PR [2], leave the comments, and share with us some thoughts related to reading from receivers without offsets using SDF.

P.S. The current solutions’ sequence diagram is in the SparkReceiverIO PR description [2]. I attached the scheme to this message as well.


Thank you in advance for your attention to it,

Elizaveta


[1] Custom Spark Receiver – https://spark.apache.org/docs/latest/streaming-custom-receivers.html

[2] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 PR – https://github.com/apache/beam/pull/17828

[3] [CdapIO] HasOffset interface was implemented #22193 PR – https://github.com/apache/beam/pull/22193

[4] SparkReceiverIO: Read via UnboundedSource – https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0