You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Marco Robles <ma...@wizeline.com> on 2021/09/01 16:13:51 UTC

Doubts on KafkaIO/SourceIO

Hi folks,

I am taking KafkaIO as an example for the PulsarIO connector, during the
development of the new IO, I got some questions on KafkaIO implementation.
I was wondering if anyone has some experience with KafkaIO SDF
implementation that might help me.

- What was taken into consideration to implement the KafkaSourceDescriptor
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>
which is used as input for the SDF in Kafka?
- In the ReadFromKafkaDoFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
class, you have to implement a getSize in order to estimate how much work
it will take. What approach do you take in order to get an estimate with an
unbounded approach like kafka?


- For the SDF implementation, I suppose it will need a Source Interface
implementation
<https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-source-interface>
and
a Reader subclass
<https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass>?
The documentation is kind of confusing in that part when you are working
with SDF, Should it be treated as Unbounded for the source/reading part?

Thanks in advance
-- 

*Marco Robles* *|* WIZELINE

Software Engineer

marco.robles@wizeline.com

Amado Nervo 2200, Esfera P6, Col. Ciudad del Sol, 45050 Zapopan, Jal.

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: Doubts on KafkaIO/SourceIO

Posted by Luke Cwik <lc...@google.com>.
https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass
is out of date and at the top says
IMPORTANT: Use Splittable DoFn to develop your new I/O. For more details,
read the new I/O connector overview.

On Fri, Sep 3, 2021 at 9:55 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Marco,
>
> I tried to answer your questions and I also CC’ed Boyuan Zhang as initial
> author of SDF-based Read implementation for KafkaIO.
>
> Also, I’d recommend to take a look on related PR’s discussion [1] which
> perhaps can give more details of some internal decisions.
>
> Please, see my answers inline.
>
> On 1 Sep 2021, at 18:13, Marco Robles <ma...@wizeline.com> wrote:
>
>
> I am taking KafkaIO as an example for the PulsarIO connector, during the
> development of the new IO, I got some questions on KafkaIO implementation.
> I was wondering if anyone has some experience with KafkaIO SDF
> implementation that might help me.
>
> - What was taken into consideration to implement the KafkaSourceDescriptor
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>
> which is used as input for the SDF in Kafka?
>
>
> IIRC, this class represents a Kafka topic partition that is used after
> in ReadFromKafkaDoFn to actually read data. So, we can have a
> PCollection<KafkaSourceDescriptor> to read them in parallel.
>

> - In the ReadFromKafkaDoFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
> class, you have to implement a getSize in order to estimate how much work
> it will take. What approach do you take in order to get an estimate with an
> unbounded approach like kafka?
>
>
> It should be quite tricky to do with unbounded sources, so we try to
> estimate the size by the number of records for current offset in topic
> partition and average record size, based on collected statistics (if any).
>

> - For the SDF implementation, I suppose it will need a Source Interface
> implementation
> <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-source-interface> and
> a Reader subclass
> <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass>?
> The documentation is kind of confusing in that part when you are working
> with SDF, Should it be treated as Unbounded for the source/reading part?
>
>
> Well, it’s actually opposite - there are two types for Read implementation
> in Beam:
> - based on Source interface, that you mentioned before (deprecated one);
> - based on Splittable DoFn [2], which is a way that one should use
> (especially for unbounded sources) for new IO connectors.
>
>
>
> [1] https://github.com/apache/beam/pull/11749
> [2] https://beam.apache.org/documentation/io/developing-io-overview/
>
>
> —
> Alexey
>
>

Re: Doubts on KafkaIO/SourceIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Marco,

I tried to answer your questions and I also CC’ed Boyuan Zhang as initial author of SDF-based Read implementation for KafkaIO.

Also, I’d recommend to take a look on related PR’s discussion [1] which perhaps can give more details of some internal decisions.

Please, see my answers inline.

On 1 Sep 2021, at 18:13, Marco Robles <ma...@wizeline.com> wrote:
> 
> I am taking KafkaIO as an example for the PulsarIO connector, during the development of the new IO, I got some questions on KafkaIO implementation. I was wondering if anyone has some experience with KafkaIO SDF implementation that might help me.
> 
> - What was taken into consideration to implement the KafkaSourceDescriptor <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java> which is used as input for the SDF in Kafka?

IIRC, this class represents a Kafka topic partition that is used after in ReadFromKafkaDoFn to actually read data. So, we can have a PCollection<KafkaSourceDescriptor> to read them in parallel.

> - In the ReadFromKafkaDoFn <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java> class, you have to implement a getSize in order to estimate how much work it will take. What approach do you take in order to get an estimate with an unbounded approach like kafka?

It should be quite tricky to do with unbounded sources, so we try to estimate the size by the number of records for current offset in topic partition and average record size, based on collected statistics (if any). 

> - For the SDF implementation, I suppose it will need a Source Interface implementation <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-source-interface> and a Reader subclass <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass>? The documentation is kind of confusing in that part when you are working with SDF, Should it be treated as Unbounded for the source/reading part?

Well, it’s actually opposite - there are two types for Read implementation in Beam:
- based on Source interface, that you mentioned before (deprecated one);
- based on Splittable DoFn [2], which is a way that one should use (especially for unbounded sources) for new IO connectors.



[1] https://github.com/apache/beam/pull/11749 <https://github.com/apache/beam/pull/11749>
[2] https://beam.apache.org/documentation/io/developing-io-overview/ <https://beam.apache.org/documentation/io/developing-io-overview/>


—
Alexey