You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Поротиков Станислав Вячеславович via user <us...@beam.apache.org> on 2023/12/19 08:57:50 UTC

Processing data from Kafka. Python

I'm trying to read data from Kafka, make some processing and then write new data to another Kafka topic.
The problem is that task is probably stucked on the processing stage.
In the logs I can see it reads data from kafka constantly. But no new data appears in the sink Kafka topic
Could you help me, what I did wrong?

My pipeline:
pipeline_flink_environment = [
    "--runner=FlinkRunner",
    "--flink_submit_uber_jar",
    "--streaming",
    "--flink_master=localhost:8081",
    "--environment_type=PROCESS",
    "--environment_config={\"command\":\"/opt/apache/beam/boot\"}"
]


def run():
    pipeline_options = PipelineOptions(pipeline_flink_environment)


    with beam.Pipeline(options=pipeline_options) as pipeline:
        kafka_message = (
                pipeline
                    | 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config,
                                                              topics=[source_topic],
                                                              expansion_service=kafka_process_expansion_service
                                                            )
                    | beam.WindowInto(beam.window.FixedWindows(15))
                    | 'Group elements' >> beam.GroupByKey()
                    | 'Write data to Kafka' >> WriteToKafka(producer_config=source_config,
                                                            topic=sink_topic,
                                                            expansion_service=kafka_process_expansion_service
                                                            )
                         )


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()



Just few lines of logs, I can see, connected to python worker:

2023-12-19 08:18:04,634 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-12-19 08:18:05,581 INFO  org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected.
2023-12-19 08:18:05,626 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: app_name
2023-12-19 08:18:05,627 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: flink_conf_dir
2023-12-19 08:18:05,628 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111 [] - semi_persistent_directory: /tmp
2023-12-19 08:18:05,628 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail.
2023-12-19 08:18:05,629 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', '--options_id=1', '--pipeline_type_check']
2023-12-19 08:18:05,629 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135 [] - Pipeline_options: {'streaming': True, 'job_name': 'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data', 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], 'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'PROCESS', 'environment_config': '{"command":"/opt/apache/beam/boot"}', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'flink_submit_uber_jar': True}
2023-12-19 08:18:05,672 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/statecache.py:234 [] - Creating state cache with size 104857600
2023-12-19 08:18:05,672 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:187 [] - Creating insecure control channel for localhost:35427.
2023-12-19 08:18:05,679 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:195 [] - Control channel established.
2023-12-19 08:18:05,682 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:243 [] - Initializing SDKHarness with unbounded number of workers.
2023-12-19 08:18:05,693 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:211 [] - Python sdk harness starting.
2023-12-19 08:18:05,693 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - Beam Fn Control client connected with id 1-2
2023-12-19 08:18:05,716 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
2023-12-19 08:18:05,717 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
2023-12-19 08:18:05,744 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - [3]Read topic from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]WindowInto(WindowIntoFn) -> ToKeyedWorkItem (1/1)#0 (856b8acfe73098d7075a2636a645f66d_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from INITIALIZING to RUNNING.
2023-12-19 08:18:05,799 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-6
2023-12-19 08:18:06,208 INFO  org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn               [] - Creating Kafka consumer for offset estimation for BeamTestSource-0
…

2023-12-19 08:18:09,478 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-5
2023-12-19 08:18:09,491 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:885 [] - Creating insecure state channel for localhost:45437.
2023-12-19 08:18:09,491 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:892 [] - State channel established.
2023-12-19 08:18:09,507 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/data_plane.py:770 [] - Creating client data channel for localhost:43761
2023-12-19 08:18:09,514 INFO  org.apache.beam.runners.fnexecution.data.GrpcDataService     [] - Beam Fn Data client connected.


Best regards,
Stanislav Porotikov


RE: Processing data from Kafka. Python

Posted by Поротиков Станислав Вячеславович via user <us...@beam.apache.org>.
It seems to be fixed by adding option to Java expansion service:
"--experiments=use_deprecated_read"
I have found connected ticket: https://issues.apache.org/jira/browse/BEAM-11991

Best regards,
Stanislav Porotikov

From: Поротиков Станислав Вячеславович via user <us...@beam.apache.org>
Sent: Tuesday, December 19, 2023 1:58 PM
To: user@beam.apache.org
Cc: Поротиков Станислав Вячеславович <s....@skbkontur.ru>
Subject: Processing data from Kafka. Python

I'm trying to read data from Kafka, make some processing and then write new data to another Kafka topic.
The problem is that task is probably stucked on the processing stage.
In the logs I can see it reads data from kafka constantly. But no new data appears in the sink Kafka topic
Could you help me, what I did wrong?

My pipeline:
pipeline_flink_environment = [
    "--runner=FlinkRunner",
    "--flink_submit_uber_jar",
    "--streaming",
    "--flink_master=localhost:8081",
    "--environment_type=PROCESS",
    "--environment_config={\"command\":\"/opt/apache/beam/boot\"}"
]


def run():
    pipeline_options = PipelineOptions(pipeline_flink_environment)


    with beam.Pipeline(options=pipeline_options) as pipeline:
        kafka_message = (
                pipeline
                    | 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config,
                                                              topics=[source_topic],
                                                              expansion_service=kafka_process_expansion_service
                                                            )
                    | beam.WindowInto(beam.window.FixedWindows(15))
                    | 'Group elements' >> beam.GroupByKey()
                    | 'Write data to Kafka' >> WriteToKafka(producer_config=source_config,
                                                            topic=sink_topic,
                                                            expansion_service=kafka_process_expansion_service
                                                            )
                         )


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()



Just few lines of logs, I can see, connected to python worker:

2023-12-19 08:18:04,634 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-12-19 08:18:05,581 INFO  org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected.
2023-12-19 08:18:05,626 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: app_name
2023-12-19 08:18:05,627 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291 [] - Not setting flag with value None: flink_conf_dir
2023-12-19 08:18:05,628 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111 [] - semi_persistent_directory: /tmp
2023-12-19 08:18:05,628 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail.
2023-12-19 08:18:05,629 WARN  /usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', '--options_id=1', '--pipeline_type_check']
2023-12-19 08:18:05,629 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135 [] - Pipeline_options: {'streaming': True, 'job_name': 'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': ['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data', 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], 'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'PROCESS', 'environment_config': '{"command":"/opt/apache/beam/boot"}', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'flink_submit_uber_jar': True}
2023-12-19 08:18:05,672 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/statecache.py:234 [] - Creating state cache with size 104857600
2023-12-19 08:18:05,672 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:187 [] - Creating insecure control channel for localhost:35427.
2023-12-19 08:18:05,679 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:195 [] - Control channel established.
2023-12-19 08:18:05,682 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:243 [] - Initializing SDKHarness with unbounded number of workers.
2023-12-19 08:18:05,693 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:211 [] - Python sdk harness starting.
2023-12-19 08:18:05,693 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - Beam Fn Control client connected with id 1-2
2023-12-19 08:18:05,716 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
2023-12-19 08:18:05,717 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
2023-12-19 08:18:05,744 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - [3]Read topic from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]WindowInto(WindowIntoFn) -> ToKeyedWorkItem (1/1)#0 (856b8acfe73098d7075a2636a645f66d_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from INITIALIZING to RUNNING.
2023-12-19 08:18:05,799 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-6
2023-12-19 08:18:06,208 INFO  org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn               [] - Creating Kafka consumer for offset estimation for BeamTestSource-0
…

2023-12-19 08:18:09,478 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-5
2023-12-19 08:18:09,491 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:885 [] - Creating insecure state channel for localhost:45437.
2023-12-19 08:18:09,491 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:892 [] - State channel established.
2023-12-19 08:18:09,507 INFO  /usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/data_plane.py:770 [] - Creating client data channel for localhost:43761
2023-12-19 08:18:09,514 INFO  org.apache.beam.runners.fnexecution.data.GrpcDataService     [] - Beam Fn Data client connected.


Best regards,
Stanislav Porotikov