You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Benjamin Tan <be...@tech.jago.com> on 2022/03/14 09:18:28 UTC

[Bug] ReadFromKafka not streaming properly on FlinkRunner in Python

I've noticed some really interesting and surprising behavior with
ReadFromKafka in Python.

I'm working with a simple Apache Beam pipeline consisting of reading from
an unbounded Kafka topic and printing the values out. I have two flavors of
this. This is done via the Flink Runner.

Version 1

  with beam.Pipeline(options=beam_options) as p:
        (p
         | "Read from Kafka topic" >> ReadFromKafka(
                    consumer_config=consumer_config,
                    topics=[producer_topic])
         | 'log' >> beam.ParDo(LogData())

This one uses from apache_beam.io.kafka import ReadFromKafka (i.e. the
default implementation that comes with Apache Beam).

*Version 2*

   with beam.Pipeline(options=beam_options) as p:
        (p
         | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
                    consumer_config={
                        "topic": producer_topic,
                        'auto_offset_reset': 'earliest',
                        "group_id": 'transaction_classification',
                        "bootstrap_servers": servers,
                    })

This one is using Beam nuggets:

from beam_nuggets.io.kafkaio import KafkaConsume

I have configured the Kafka producer to produce an element every 1 second.

What I've observed is that when I consume from ReadFromKafka (version 1),
the elements get produced around 4-6 seconds apart, and are batched
together.

On the other hand, if I tried the same thing with KafkaConsume (version 2),
then I get elements as they are produced (i.e. every second), which is
exactly the behavior I expected.

I have tried to make the consumer_config to be the same for both, but it
doesn't seem to have any effect on version 1.

Now, I would like to stick to version 1 because that gives me proper
metrics in the Flink UI, while version 2 works better, I don't get any
metrics in Flink (everything is reported as 0 bytes received / 0 records
received).

I don't understand why ReadFromKafka seems to be batching a few records
together before it gets pushed down the pipeline. Does anyone have any
ideas? This behavior doesn't exhibit itself on the DataFlow runner though.
Is there any setting that I can try? Otherwise, how are folks dealing with
reading from Kafka for unbounded streams?

Re: [Bug] ReadFromKafka not streaming properly on FlinkRunner in Python

Posted by Chamikara Jayalath <ch...@google.com>.
Note that Beam nuggets is implemented as a ParDo that reads from Kafka [1].
So it's not a full Beam unbounded source that implements features such as
splitting, watermark reporting, backlog reporting etc. Such features are
important for proper operation of many streaming pipelines. Based on what
you reported, I don't think the behavior you are observing for ReadFromKafka
 is out of spec for an unbounded source. Note that, due to a known issue
with SDF and Flink runner you need to use experiment
"beam_fn_api_use_deprecated_read" when starting up the expansion
service for ReadFromKafka to operate correctly on Flink runner.

Thanks,
Cham

[1]
https://github.com/mohaseeb/beam-nuggets/blob/a3cc2343db82abb1eb14974260683148f82fe517/beam_nuggets/io/kafkaio.py#L67

On Fri, Mar 25, 2022 at 3:26 PM Luke Cwik <lc...@google.com> wrote:

> Version 1 uses Apache Beam's Java implementation of the KafkaIO transform.
> Version 2 uses a native Python implementation from beam_nuggets.
>
> This means that for version 1 there is an XLang barrier between the Java
> Kafka implementation and your Python transforms which can lead to having
> some runners have larger bundles since it depends on whether the runner
> fuses these two segments together or not and materializes the data. You
> could try to control how often Flink checkpoints so that it can choose to
> have smaller and more frequent bundles.
>
>
>
> On Mon, Mar 14, 2022 at 2:18 AM Benjamin Tan <be...@tech.jago.com>
> wrote:
>
>> I've noticed some really interesting and surprising behavior with
>> ReadFromKafka in Python.
>>
>> I'm working with a simple Apache Beam pipeline consisting of reading from
>> an unbounded Kafka topic and printing the values out. I have two flavors of
>> this. This is done via the Flink Runner.
>>
>> Version 1
>>
>>   with beam.Pipeline(options=beam_options) as p:
>>         (p
>>          | "Read from Kafka topic" >> ReadFromKafka(
>>                     consumer_config=consumer_config,
>>                     topics=[producer_topic])
>>          | 'log' >> beam.ParDo(LogData())
>>
>> This one uses from apache_beam.io.kafka import ReadFromKafka (i.e. the
>> default implementation that comes with Apache Beam).
>>
>> *Version 2*
>>
>>    with beam.Pipeline(options=beam_options) as p:
>>         (p
>>          | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
>>                     consumer_config={
>>                         "topic": producer_topic,
>>                         'auto_offset_reset': 'earliest',
>>                         "group_id": 'transaction_classification',
>>                         "bootstrap_servers": servers,
>>                     })
>>
>> This one is using Beam nuggets:
>>
>> from beam_nuggets.io.kafkaio import KafkaConsume
>>
>> I have configured the Kafka producer to produce an element every 1 second.
>>
>> What I've observed is that when I consume from ReadFromKafka (version
>> 1), the elements get produced around 4-6 seconds apart, and are batched
>> together.
>>
>> On the other hand, if I tried the same thing with KafkaConsume (version
>> 2), then I get elements as they are produced (i.e. every second), which is
>> exactly the behavior I expected.
>>
>> I have tried to make the consumer_config to be the same for both, but it
>> doesn't seem to have any effect on version 1.
>>
>> Now, I would like to stick to version 1 because that gives me proper
>> metrics in the Flink UI, while version 2 works better, I don't get any
>> metrics in Flink (everything is reported as 0 bytes received / 0 records
>> received).
>>
>> I don't understand why ReadFromKafka seems to be batching a few records
>> together before it gets pushed down the pipeline. Does anyone have any
>> ideas? This behavior doesn't exhibit itself on the DataFlow runner though.
>> Is there any setting that I can try? Otherwise, how are folks dealing with
>> reading from Kafka for unbounded streams?
>>
>>

Re: [Bug] ReadFromKafka not streaming properly on FlinkRunner in Python

Posted by Luke Cwik <lc...@google.com>.
Version 1 uses Apache Beam's Java implementation of the KafkaIO transform.
Version 2 uses a native Python implementation from beam_nuggets.

This means that for version 1 there is an XLang barrier between the Java
Kafka implementation and your Python transforms which can lead to having
some runners have larger bundles since it depends on whether the runner
fuses these two segments together or not and materializes the data. You
could try to control how often Flink checkpoints so that it can choose to
have smaller and more frequent bundles.



On Mon, Mar 14, 2022 at 2:18 AM Benjamin Tan <be...@tech.jago.com>
wrote:

> I've noticed some really interesting and surprising behavior with
> ReadFromKafka in Python.
>
> I'm working with a simple Apache Beam pipeline consisting of reading from
> an unbounded Kafka topic and printing the values out. I have two flavors of
> this. This is done via the Flink Runner.
>
> Version 1
>
>   with beam.Pipeline(options=beam_options) as p:
>         (p
>          | "Read from Kafka topic" >> ReadFromKafka(
>                     consumer_config=consumer_config,
>                     topics=[producer_topic])
>          | 'log' >> beam.ParDo(LogData())
>
> This one uses from apache_beam.io.kafka import ReadFromKafka (i.e. the
> default implementation that comes with Apache Beam).
>
> *Version 2*
>
>    with beam.Pipeline(options=beam_options) as p:
>         (p
>          | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
>                     consumer_config={
>                         "topic": producer_topic,
>                         'auto_offset_reset': 'earliest',
>                         "group_id": 'transaction_classification',
>                         "bootstrap_servers": servers,
>                     })
>
> This one is using Beam nuggets:
>
> from beam_nuggets.io.kafkaio import KafkaConsume
>
> I have configured the Kafka producer to produce an element every 1 second.
>
> What I've observed is that when I consume from ReadFromKafka (version 1),
> the elements get produced around 4-6 seconds apart, and are batched
> together.
>
> On the other hand, if I tried the same thing with KafkaConsume (version
> 2), then I get elements as they are produced (i.e. every second), which is
> exactly the behavior I expected.
>
> I have tried to make the consumer_config to be the same for both, but it
> doesn't seem to have any effect on version 1.
>
> Now, I would like to stick to version 1 because that gives me proper
> metrics in the Flink UI, while version 2 works better, I don't get any
> metrics in Flink (everything is reported as 0 bytes received / 0 records
> received).
>
> I don't understand why ReadFromKafka seems to be batching a few records
> together before it gets pushed down the pipeline. Does anyone have any
> ideas? This behavior doesn't exhibit itself on the DataFlow runner though.
> Is there any setting that I can try? Otherwise, how are folks dealing with
> reading from Kafka for unbounded streams?
>
>