You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Mauro Riva (Jira)" <ji...@apache.org> on 2021/03/17 09:24:00 UTC

[jira] [Updated] (BEAM-11993) ReadFromKafka doesn’t send data to the next PTransform – Apache Flink "Cluster" – Apache Beam Python SDK

     [ https://issues.apache.org/jira/browse/BEAM-11993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mauro Riva updated BEAM-11993:
------------------------------
    External issue URL: https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data

> ReadFromKafka doesn’t send data to the next PTransform –  Apache Flink "Cluster" – Apache Beam Python SDK
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11993
>                 URL: https://issues.apache.org/jira/browse/BEAM-11993
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model, cross-language, io-py-kafka, runner-flink
>    Affects Versions: 2.26.0, 2.27.0, 2.28.0
>            Reporter: Mauro Riva
>            Priority: P2
>
> I am trying to build a streaming pipeline using Python. The pipeline should subscribe to a Kafka topic and process the data on the fly. I am using the following configuration:
> {code:java}
> class PrintFn(beam.DoFn):
>     def __init__(self, label):
>         self.label = label    
>     def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
>         logging.info("[%s]: %s %s %s", self.label, element, window, timestamp)
>         yield element
> [...]
> pipeline_args = [ 
>  "--job_endpoint=localhost:8099", 
>  "--runner=PortableRunner" , 
>  "--environment_type=DOCKER", 
>  "--environment_config=gcr.io/xxxx/beam_python3.7_sdk:v2.28.0-custom", 
>  "--enable_streaming_engine"
> ] 
> pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True) 
> DataPipeline = beam.Pipeline(options=pipeline_options) 
> ReadData = ( 
>    DataPipeline 
>    | "ReadFromKafka" 
>    >> ReadFromKafka( 
>        consumer_config={ 
>                  "bootstrap.servers": "10.0.1.40:9092", 
>                  "auto.offset.reset":"latest" 
>        }, 
>        topics="beam_topic", 
>        expansion_service="localhost:8097" 
>       ) 
>    | "Debug" 
>    >> beam.ParDo(PrintFn(label="test")) 
> )
> {code}
> and a Flink configuration with Job and Task managers. The pipeline is loaded, but as soon as it starts running, the task: 
> {code:java}
> Source: Impulse -> [3]ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/{ParDo(OutputSingleSource), ParDo(UnboundedSourceAsSDFWrapper)}
> {code}
> changes its status from RUNNING to FINISHED. The Kafka consumer remains subscribed and reports the following:
> {code:java}
> 2021-03-16 16:10:54,628 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of partition topic_beam-0 
> 2021-03-16 16:10:54,629 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition topic_beam-0 to offset 144. 
> 2021-03-16 16:10:55,628 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of partition topic_beam-0 
> 2021-03-16 16:10:55,629 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition topic_beam-0 to offset 145.{code}
> But it doesn’t send any data to the next task:
> {code:java}
> [3]ReadFromKafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Debug
> {code}
> which remains in RUNNING mode.
> Changing the configuration to:
> {code:java}
> | "ReadFromKafka"
> >> ReadFromKafka( 
>          consumer_config={ 
>               "bootstrap.servers": "10.0.1.40:9092", 
>               "auto.offset.reset":"earliest" 
>          }, 
>          topics="beam_topic", 
>          max_num_records=10, 
>          expansion_service="localhost:8097" 
> ) 
> | "Debug" 
>  >> beam.ParDo(PrintFn(label="test")) 
> )
> {code}
> seems to work but only for the X (in the code = {color:#FF0000}10{color}) records that should be already available in the broker, and I get the info logging as expected:
> {code:java}
> 2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow Timestamp(1615910111.418000) 
>  2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow Timestamp(1615910111.418000)
> {code}
> After reading those messages, the complete pipeline (both mentioned tasks) changes its status to FINISHED (as expected).
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)