You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2021/06/24 12:48:05 UTC

Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Hello,

We are trying to implement a simple Beam pipeline with Python and SDK
2.30.0  reading from Kafka (Confluent Cloud) -
it works locally with the DirectRunner, but fails on the Dataflow Service
without any error message.
The subnetwork is able to reach Kafka, Kafka itself is protected by SASL -
but this is working with the LocalRunner.

We checked that the IP on the workers is resolved in the same way as on our
dev machine.

Any idea what we can try to debug this?

Best,
Tobi

def run(argv=None):
    pipeline_options = PipelineOptions(pipeline_args)
    ...
    kafka_options = {'bootstrap.servers': known_args.bootstrap,
                     'client.id': 'di-qa-fraud-beam',
                     'group.id': 'di-qa-fraud-beam',
                     'auto.offset.reset': 'earliest'}
    if known_args.is_sasl_enabled:
        kafka_options['sasl.mechanism'] = 'PLAIN'
        kafka_options['security.protocol'] = 'SASL_SSL'
        kafka_options['sasl.jaas.config'] =
f"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"{known_args.sasl_username}\"
password=\"{known_args.sasl_password}\";"
    if known_args.incidents_api_url:
        os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
    # Direct runner and Flink runner are not yet well supporting Kafka
streaming mode
    # see https://issues.apache.org/jira/browse/BEAM-11991
    # https://issues.apache.org/jira/browse/BEAM-11993
    # https://issues.apache.org/jira/browse/BEAM-11998
    # in dev mode, only consume 1 record before ending the pipeline
    max_num_records = 1 if known_args.dev else None
    max_num_records = None
    incident_hook = IncidentsHook(debug=False)
    logging.debug('debug message')
    logging.info('info message')
    with beam.Pipeline(options=pipeline_options) as p:
        (
                p
                | 'Read From Kafka' >>
ReadFromKafka(consumer_config=kafka_options,
                                                     topics=[_QA_TOPIC],

 start_read_time=1624143600000,
                                                     max_num_records=10,

 commit_offset_in_finalize=True)
                | 'Jus log' >> beam.ParDo(PrintFn())

        )



[image: image.png]

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
(We thought that the v2 runner was actually active when we run it, that was
our mistake to not force it via the parameter.)

On Fri, Jun 25, 2021 at 11:24 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Thank you! I totally overlooked that.
>
> On Fri, Jun 25, 2021 at 1:12 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Thanks. It's documented here
>> <https://beam.apache.org/documentation/programming-guide/#x-lang-transform-runner-support>
>> but I'll also send a PR to fail early when Runner v2 is not enabled.
>>
>> - Cham
>>
>> On Thu, Jun 24, 2021 at 6:30 AM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> Hello,
>>>
>>> Thank you! - this actually solved the problem. However the documentation
>>> should be updated to reflect this.
>>>
>>> Cheers :)
>>>
>>> Tobi
>>>
>>> On Thu, Jun 24, 2021 at 2:56 PM Alex Koay <al...@gmail.com> wrote:
>>>
>>>> Hey Tobias
>>>>
>>>> Did you add "--experiments=use_runner_v2"? I got bitten by this same
>>>> issue just a few weeks back.
>>>> The end result is that the block doesn't become expandable and causes
>>>> this problem.
>>>>
>>>> Cheers
>>>> Alex
>>>>
>>>> On Thu, Jun 24, 2021 at 8:53 PM Kaymak, Tobias <
>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>
>>>>> So: With the LocalRunner we see 10 messages printed as expected
>>>>> On Dataflow we see one log message showing that the DoFn for printing
>>>>> the records is invoked once (with no record being logged) and then nothing
>>>>> happens and the pipeline shuts down.
>>>>>
>>>>> On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <
>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We are trying to implement a simple Beam pipeline with Python and SDK
>>>>>> 2.30.0  reading from Kafka (Confluent Cloud) -
>>>>>> it works locally with the DirectRunner, but fails on the Dataflow
>>>>>> Service without any error message.
>>>>>> The subnetwork is able to reach Kafka, Kafka itself is protected by
>>>>>> SASL - but this is working with the LocalRunner.
>>>>>>
>>>>>> We checked that the IP on the workers is resolved in the same way as
>>>>>> on our dev machine.
>>>>>>
>>>>>> Any idea what we can try to debug this?
>>>>>>
>>>>>> Best,
>>>>>> Tobi
>>>>>>
>>>>>> def run(argv=None):
>>>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>>>     ...
>>>>>>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>>>>>>                      'client.id': 'di-qa-fraud-beam',
>>>>>>                      'group.id': 'di-qa-fraud-beam',
>>>>>>                      'auto.offset.reset': 'earliest'}
>>>>>>     if known_args.is_sasl_enabled:
>>>>>>         kafka_options['sasl.mechanism'] = 'PLAIN'
>>>>>>         kafka_options['security.protocol'] = 'SASL_SSL'
>>>>>>         kafka_options['sasl.jaas.config'] =
>>>>>> f"org.apache.kafka.common.security.plain.PlainLoginModule required
>>>>>> username=\"{known_args.sasl_username}\"
>>>>>> password=\"{known_args.sasl_password}\";"
>>>>>>     if known_args.incidents_api_url:
>>>>>>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>>>>>>     # Direct runner and Flink runner are not yet well supporting
>>>>>> Kafka streaming mode
>>>>>>     # see https://issues.apache.org/jira/browse/BEAM-11991
>>>>>>     # https://issues.apache.org/jira/browse/BEAM-11993
>>>>>>     # https://issues.apache.org/jira/browse/BEAM-11998
>>>>>>     # in dev mode, only consume 1 record before ending the pipeline
>>>>>>     max_num_records = 1 if known_args.dev else None
>>>>>>     max_num_records = None
>>>>>>     incident_hook = IncidentsHook(debug=False)
>>>>>>     logging.debug('debug message')
>>>>>>     logging.info('info message')
>>>>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>>>>         (
>>>>>>                 p
>>>>>>                 | 'Read From Kafka' >>
>>>>>> ReadFromKafka(consumer_config=kafka_options,
>>>>>>
>>>>>>  topics=[_QA_TOPIC],
>>>>>>
>>>>>>  start_read_time=1624143600000,
>>>>>>
>>>>>>  max_num_records=10,
>>>>>>
>>>>>>  commit_offset_in_finalize=True)
>>>>>>                 | 'Jus log' >> beam.ParDo(PrintFn())
>>>>>>
>>>>>>         )
>>>>>>
>>>>>>
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Thank you! I totally overlooked that.

On Fri, Jun 25, 2021 at 1:12 AM Chamikara Jayalath <ch...@google.com>
wrote:

> Thanks. It's documented here
> <https://beam.apache.org/documentation/programming-guide/#x-lang-transform-runner-support>
> but I'll also send a PR to fail early when Runner v2 is not enabled.
>
> - Cham
>
> On Thu, Jun 24, 2021 at 6:30 AM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> Hello,
>>
>> Thank you! - this actually solved the problem. However the documentation
>> should be updated to reflect this.
>>
>> Cheers :)
>>
>> Tobi
>>
>> On Thu, Jun 24, 2021 at 2:56 PM Alex Koay <al...@gmail.com> wrote:
>>
>>> Hey Tobias
>>>
>>> Did you add "--experiments=use_runner_v2"? I got bitten by this same
>>> issue just a few weeks back.
>>> The end result is that the block doesn't become expandable and causes
>>> this problem.
>>>
>>> Cheers
>>> Alex
>>>
>>> On Thu, Jun 24, 2021 at 8:53 PM Kaymak, Tobias <to...@ricardo.ch>
>>> wrote:
>>>
>>>> So: With the LocalRunner we see 10 messages printed as expected
>>>> On Dataflow we see one log message showing that the DoFn for printing
>>>> the records is invoked once (with no record being logged) and then nothing
>>>> happens and the pipeline shuts down.
>>>>
>>>> On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <
>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We are trying to implement a simple Beam pipeline with Python and SDK
>>>>> 2.30.0  reading from Kafka (Confluent Cloud) -
>>>>> it works locally with the DirectRunner, but fails on the Dataflow
>>>>> Service without any error message.
>>>>> The subnetwork is able to reach Kafka, Kafka itself is protected by
>>>>> SASL - but this is working with the LocalRunner.
>>>>>
>>>>> We checked that the IP on the workers is resolved in the same way as
>>>>> on our dev machine.
>>>>>
>>>>> Any idea what we can try to debug this?
>>>>>
>>>>> Best,
>>>>> Tobi
>>>>>
>>>>> def run(argv=None):
>>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>>     ...
>>>>>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>>>>>                      'client.id': 'di-qa-fraud-beam',
>>>>>                      'group.id': 'di-qa-fraud-beam',
>>>>>                      'auto.offset.reset': 'earliest'}
>>>>>     if known_args.is_sasl_enabled:
>>>>>         kafka_options['sasl.mechanism'] = 'PLAIN'
>>>>>         kafka_options['security.protocol'] = 'SASL_SSL'
>>>>>         kafka_options['sasl.jaas.config'] =
>>>>> f"org.apache.kafka.common.security.plain.PlainLoginModule required
>>>>> username=\"{known_args.sasl_username}\"
>>>>> password=\"{known_args.sasl_password}\";"
>>>>>     if known_args.incidents_api_url:
>>>>>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>>>>>     # Direct runner and Flink runner are not yet well supporting Kafka
>>>>> streaming mode
>>>>>     # see https://issues.apache.org/jira/browse/BEAM-11991
>>>>>     # https://issues.apache.org/jira/browse/BEAM-11993
>>>>>     # https://issues.apache.org/jira/browse/BEAM-11998
>>>>>     # in dev mode, only consume 1 record before ending the pipeline
>>>>>     max_num_records = 1 if known_args.dev else None
>>>>>     max_num_records = None
>>>>>     incident_hook = IncidentsHook(debug=False)
>>>>>     logging.debug('debug message')
>>>>>     logging.info('info message')
>>>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>>>         (
>>>>>                 p
>>>>>                 | 'Read From Kafka' >>
>>>>> ReadFromKafka(consumer_config=kafka_options,
>>>>>
>>>>>  topics=[_QA_TOPIC],
>>>>>
>>>>>  start_read_time=1624143600000,
>>>>>
>>>>>  max_num_records=10,
>>>>>
>>>>>  commit_offset_in_finalize=True)
>>>>>                 | 'Jus log' >> beam.ParDo(PrintFn())
>>>>>
>>>>>         )
>>>>>
>>>>>
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by Chamikara Jayalath <ch...@google.com>.
Thanks. It's documented here
<https://beam.apache.org/documentation/programming-guide/#x-lang-transform-runner-support>
but I'll also send a PR to fail early when Runner v2 is not enabled.

- Cham

On Thu, Jun 24, 2021 at 6:30 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Hello,
>
> Thank you! - this actually solved the problem. However the documentation
> should be updated to reflect this.
>
> Cheers :)
>
> Tobi
>
> On Thu, Jun 24, 2021 at 2:56 PM Alex Koay <al...@gmail.com> wrote:
>
>> Hey Tobias
>>
>> Did you add "--experiments=use_runner_v2"? I got bitten by this same
>> issue just a few weeks back.
>> The end result is that the block doesn't become expandable and causes
>> this problem.
>>
>> Cheers
>> Alex
>>
>> On Thu, Jun 24, 2021 at 8:53 PM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> So: With the LocalRunner we see 10 messages printed as expected
>>> On Dataflow we see one log message showing that the DoFn for printing
>>> the records is invoked once (with no record being logged) and then nothing
>>> happens and the pipeline shuts down.
>>>
>>> On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <to...@ricardo.ch>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We are trying to implement a simple Beam pipeline with Python and SDK
>>>> 2.30.0  reading from Kafka (Confluent Cloud) -
>>>> it works locally with the DirectRunner, but fails on the Dataflow
>>>> Service without any error message.
>>>> The subnetwork is able to reach Kafka, Kafka itself is protected by
>>>> SASL - but this is working with the LocalRunner.
>>>>
>>>> We checked that the IP on the workers is resolved in the same way as on
>>>> our dev machine.
>>>>
>>>> Any idea what we can try to debug this?
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> def run(argv=None):
>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>     ...
>>>>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>>>>                      'client.id': 'di-qa-fraud-beam',
>>>>                      'group.id': 'di-qa-fraud-beam',
>>>>                      'auto.offset.reset': 'earliest'}
>>>>     if known_args.is_sasl_enabled:
>>>>         kafka_options['sasl.mechanism'] = 'PLAIN'
>>>>         kafka_options['security.protocol'] = 'SASL_SSL'
>>>>         kafka_options['sasl.jaas.config'] =
>>>> f"org.apache.kafka.common.security.plain.PlainLoginModule required
>>>> username=\"{known_args.sasl_username}\"
>>>> password=\"{known_args.sasl_password}\";"
>>>>     if known_args.incidents_api_url:
>>>>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>>>>     # Direct runner and Flink runner are not yet well supporting Kafka
>>>> streaming mode
>>>>     # see https://issues.apache.org/jira/browse/BEAM-11991
>>>>     # https://issues.apache.org/jira/browse/BEAM-11993
>>>>     # https://issues.apache.org/jira/browse/BEAM-11998
>>>>     # in dev mode, only consume 1 record before ending the pipeline
>>>>     max_num_records = 1 if known_args.dev else None
>>>>     max_num_records = None
>>>>     incident_hook = IncidentsHook(debug=False)
>>>>     logging.debug('debug message')
>>>>     logging.info('info message')
>>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>>         (
>>>>                 p
>>>>                 | 'Read From Kafka' >>
>>>> ReadFromKafka(consumer_config=kafka_options,
>>>>                                                      topics=[_QA_TOPIC],
>>>>
>>>>  start_read_time=1624143600000,
>>>>                                                      max_num_records=10,
>>>>
>>>>  commit_offset_in_finalize=True)
>>>>                 | 'Jus log' >> beam.ParDo(PrintFn())
>>>>
>>>>         )
>>>>
>>>>
>>>>
>>>> [image: image.png]
>>>>
>>>

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Hello,

Thank you! - this actually solved the problem. However the documentation
should be updated to reflect this.

Cheers :)

Tobi

On Thu, Jun 24, 2021 at 2:56 PM Alex Koay <al...@gmail.com> wrote:

> Hey Tobias
>
> Did you add "--experiments=use_runner_v2"? I got bitten by this same issue
> just a few weeks back.
> The end result is that the block doesn't become expandable and causes this
> problem.
>
> Cheers
> Alex
>
> On Thu, Jun 24, 2021 at 8:53 PM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> So: With the LocalRunner we see 10 messages printed as expected
>> On Dataflow we see one log message showing that the DoFn for printing the
>> records is invoked once (with no record being logged) and then nothing
>> happens and the pipeline shuts down.
>>
>> On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> Hello,
>>>
>>> We are trying to implement a simple Beam pipeline with Python and SDK
>>> 2.30.0  reading from Kafka (Confluent Cloud) -
>>> it works locally with the DirectRunner, but fails on the Dataflow
>>> Service without any error message.
>>> The subnetwork is able to reach Kafka, Kafka itself is protected by SASL
>>> - but this is working with the LocalRunner.
>>>
>>> We checked that the IP on the workers is resolved in the same way as on
>>> our dev machine.
>>>
>>> Any idea what we can try to debug this?
>>>
>>> Best,
>>> Tobi
>>>
>>> def run(argv=None):
>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>     ...
>>>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>>>                      'client.id': 'di-qa-fraud-beam',
>>>                      'group.id': 'di-qa-fraud-beam',
>>>                      'auto.offset.reset': 'earliest'}
>>>     if known_args.is_sasl_enabled:
>>>         kafka_options['sasl.mechanism'] = 'PLAIN'
>>>         kafka_options['security.protocol'] = 'SASL_SSL'
>>>         kafka_options['sasl.jaas.config'] =
>>> f"org.apache.kafka.common.security.plain.PlainLoginModule required
>>> username=\"{known_args.sasl_username}\"
>>> password=\"{known_args.sasl_password}\";"
>>>     if known_args.incidents_api_url:
>>>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>>>     # Direct runner and Flink runner are not yet well supporting Kafka
>>> streaming mode
>>>     # see https://issues.apache.org/jira/browse/BEAM-11991
>>>     # https://issues.apache.org/jira/browse/BEAM-11993
>>>     # https://issues.apache.org/jira/browse/BEAM-11998
>>>     # in dev mode, only consume 1 record before ending the pipeline
>>>     max_num_records = 1 if known_args.dev else None
>>>     max_num_records = None
>>>     incident_hook = IncidentsHook(debug=False)
>>>     logging.debug('debug message')
>>>     logging.info('info message')
>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>         (
>>>                 p
>>>                 | 'Read From Kafka' >>
>>> ReadFromKafka(consumer_config=kafka_options,
>>>                                                      topics=[_QA_TOPIC],
>>>
>>>  start_read_time=1624143600000,
>>>                                                      max_num_records=10,
>>>
>>>  commit_offset_in_finalize=True)
>>>                 | 'Jus log' >> beam.ParDo(PrintFn())
>>>
>>>         )
>>>
>>>
>>>
>>> [image: image.png]
>>>
>>

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by Alex Koay <al...@gmail.com>.
Hey Tobias

Did you add "--experiments=use_runner_v2"? I got bitten by this same issue
just a few weeks back.
The end result is that the block doesn't become expandable and causes this
problem.

Cheers
Alex

On Thu, Jun 24, 2021 at 8:53 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> So: With the LocalRunner we see 10 messages printed as expected
> On Dataflow we see one log message showing that the DoFn for printing the
> records is invoked once (with no record being logged) and then nothing
> happens and the pipeline shuts down.
>
> On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> Hello,
>>
>> We are trying to implement a simple Beam pipeline with Python and SDK
>> 2.30.0  reading from Kafka (Confluent Cloud) -
>> it works locally with the DirectRunner, but fails on the Dataflow Service
>> without any error message.
>> The subnetwork is able to reach Kafka, Kafka itself is protected by SASL
>> - but this is working with the LocalRunner.
>>
>> We checked that the IP on the workers is resolved in the same way as on
>> our dev machine.
>>
>> Any idea what we can try to debug this?
>>
>> Best,
>> Tobi
>>
>> def run(argv=None):
>>     pipeline_options = PipelineOptions(pipeline_args)
>>     ...
>>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>>                      'client.id': 'di-qa-fraud-beam',
>>                      'group.id': 'di-qa-fraud-beam',
>>                      'auto.offset.reset': 'earliest'}
>>     if known_args.is_sasl_enabled:
>>         kafka_options['sasl.mechanism'] = 'PLAIN'
>>         kafka_options['security.protocol'] = 'SASL_SSL'
>>         kafka_options['sasl.jaas.config'] =
>> f"org.apache.kafka.common.security.plain.PlainLoginModule required
>> username=\"{known_args.sasl_username}\"
>> password=\"{known_args.sasl_password}\";"
>>     if known_args.incidents_api_url:
>>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>>     # Direct runner and Flink runner are not yet well supporting Kafka
>> streaming mode
>>     # see https://issues.apache.org/jira/browse/BEAM-11991
>>     # https://issues.apache.org/jira/browse/BEAM-11993
>>     # https://issues.apache.org/jira/browse/BEAM-11998
>>     # in dev mode, only consume 1 record before ending the pipeline
>>     max_num_records = 1 if known_args.dev else None
>>     max_num_records = None
>>     incident_hook = IncidentsHook(debug=False)
>>     logging.debug('debug message')
>>     logging.info('info message')
>>     with beam.Pipeline(options=pipeline_options) as p:
>>         (
>>                 p
>>                 | 'Read From Kafka' >>
>> ReadFromKafka(consumer_config=kafka_options,
>>                                                      topics=[_QA_TOPIC],
>>
>>  start_read_time=1624143600000,
>>                                                      max_num_records=10,
>>
>>  commit_offset_in_finalize=True)
>>                 | 'Jus log' >> beam.ParDo(PrintFn())
>>
>>         )
>>
>>
>>
>> [image: image.png]
>>
>

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
When we implement the same logic in a Beam Java SDK and run it on the
Dataflow service it works fine
Job name: qakafkatoincident-fsilberstein-0624125715-d7f42a3a, Job ID:
2021-06-24_05_57_23-11556124538619742286

On Thu, Jun 24, 2021 at 2:52 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> So: With the LocalRunner we see 10 messages printed as expected
> On Dataflow we see one log message showing that the DoFn for printing the
> records is invoked once (with no record being logged) and then nothing
> happens and the pipeline shuts down.
>
> On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> Hello,
>>
>> We are trying to implement a simple Beam pipeline with Python and SDK
>> 2.30.0  reading from Kafka (Confluent Cloud) -
>> it works locally with the DirectRunner, but fails on the Dataflow Service
>> without any error message.
>> The subnetwork is able to reach Kafka, Kafka itself is protected by SASL
>> - but this is working with the LocalRunner.
>>
>> We checked that the IP on the workers is resolved in the same way as on
>> our dev machine.
>>
>> Any idea what we can try to debug this?
>>
>> Best,
>> Tobi
>>
>> def run(argv=None):
>>     pipeline_options = PipelineOptions(pipeline_args)
>>     ...
>>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>>                      'client.id': 'di-qa-fraud-beam',
>>                      'group.id': 'di-qa-fraud-beam',
>>                      'auto.offset.reset': 'earliest'}
>>     if known_args.is_sasl_enabled:
>>         kafka_options['sasl.mechanism'] = 'PLAIN'
>>         kafka_options['security.protocol'] = 'SASL_SSL'
>>         kafka_options['sasl.jaas.config'] =
>> f"org.apache.kafka.common.security.plain.PlainLoginModule required
>> username=\"{known_args.sasl_username}\"
>> password=\"{known_args.sasl_password}\";"
>>     if known_args.incidents_api_url:
>>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>>     # Direct runner and Flink runner are not yet well supporting Kafka
>> streaming mode
>>     # see https://issues.apache.org/jira/browse/BEAM-11991
>>     # https://issues.apache.org/jira/browse/BEAM-11993
>>     # https://issues.apache.org/jira/browse/BEAM-11998
>>     # in dev mode, only consume 1 record before ending the pipeline
>>     max_num_records = 1 if known_args.dev else None
>>     max_num_records = None
>>     incident_hook = IncidentsHook(debug=False)
>>     logging.debug('debug message')
>>     logging.info('info message')
>>     with beam.Pipeline(options=pipeline_options) as p:
>>         (
>>                 p
>>                 | 'Read From Kafka' >>
>> ReadFromKafka(consumer_config=kafka_options,
>>                                                      topics=[_QA_TOPIC],
>>
>>  start_read_time=1624143600000,
>>                                                      max_num_records=10,
>>
>>  commit_offset_in_finalize=True)
>>                 | 'Jus log' >> beam.ParDo(PrintFn())
>>
>>         )
>>
>>
>>
>> [image: image.png]
>>
>

Re: Simple Python Kafka pipeline works in DirectRunner but not on Dataflow

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
So: With the LocalRunner we see 10 messages printed as expected
On Dataflow we see one log message showing that the DoFn for printing the
records is invoked once (with no record being logged) and then nothing
happens and the pipeline shuts down.

On Thu, Jun 24, 2021 at 2:48 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Hello,
>
> We are trying to implement a simple Beam pipeline with Python and SDK
> 2.30.0  reading from Kafka (Confluent Cloud) -
> it works locally with the DirectRunner, but fails on the Dataflow Service
> without any error message.
> The subnetwork is able to reach Kafka, Kafka itself is protected by SASL -
> but this is working with the LocalRunner.
>
> We checked that the IP on the workers is resolved in the same way as on
> our dev machine.
>
> Any idea what we can try to debug this?
>
> Best,
> Tobi
>
> def run(argv=None):
>     pipeline_options = PipelineOptions(pipeline_args)
>     ...
>     kafka_options = {'bootstrap.servers': known_args.bootstrap,
>                      'client.id': 'di-qa-fraud-beam',
>                      'group.id': 'di-qa-fraud-beam',
>                      'auto.offset.reset': 'earliest'}
>     if known_args.is_sasl_enabled:
>         kafka_options['sasl.mechanism'] = 'PLAIN'
>         kafka_options['security.protocol'] = 'SASL_SSL'
>         kafka_options['sasl.jaas.config'] =
> f"org.apache.kafka.common.security.plain.PlainLoginModule required
> username=\"{known_args.sasl_username}\"
> password=\"{known_args.sasl_password}\";"
>     if known_args.incidents_api_url:
>         os.environ["INCIDENTS_API_URI"] = known_args.incidents_api_url
>     # Direct runner and Flink runner are not yet well supporting Kafka
> streaming mode
>     # see https://issues.apache.org/jira/browse/BEAM-11991
>     # https://issues.apache.org/jira/browse/BEAM-11993
>     # https://issues.apache.org/jira/browse/BEAM-11998
>     # in dev mode, only consume 1 record before ending the pipeline
>     max_num_records = 1 if known_args.dev else None
>     max_num_records = None
>     incident_hook = IncidentsHook(debug=False)
>     logging.debug('debug message')
>     logging.info('info message')
>     with beam.Pipeline(options=pipeline_options) as p:
>         (
>                 p
>                 | 'Read From Kafka' >>
> ReadFromKafka(consumer_config=kafka_options,
>                                                      topics=[_QA_TOPIC],
>
>  start_read_time=1624143600000,
>                                                      max_num_records=10,
>
>  commit_offset_in_finalize=True)
>                 | 'Jus log' >> beam.ParDo(PrintFn())
>
>         )
>
>
>
> [image: image.png]
>