You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/20 17:26:00 UTC
[jira] [Commented] (BEAM-10786) Stateful DoFn with Python sdk and DataFlow runner
[ https://issues.apache.org/jira/browse/BEAM-10786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509493#comment-17509493 ]
Beam JIRA Bot commented on BEAM-10786:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> Stateful DoFn with Python sdk and DataFlow runner
> -------------------------------------------------
>
> Key: BEAM-10786
> URL: https://issues.apache.org/jira/browse/BEAM-10786
> Project: Beam
> Issue Type: New Feature
> Components: runner-dataflow, runner-direct, sdk-py-core
> Reporter: STACKOWIAK Denis
> Priority: P2
> Labels: stale-P2
>
> Hello,
> This is related to : https://issues.apache.org/jira/browse/BEAM-9655
>
> We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.
>
> To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).
>
> Results :
> ||Runner||Mode||StateSpec||Result||Error||JobId||
> |Direct|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#1| |
> |Direct|Batch|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
> |Direct|Batch|CombiningValue|{color:#00875a}OK{color}| | |
> |Direct|Batch|Bag|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Timer; TimeDomain.REALTIME|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
> |Direct|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Bag|{color:#00875a}OK{color}| | |
> |Dataflow|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#2|2020-08-20_08_14_07-5985905092341835149|
> |Dataflow|Batch|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#2|2020-08-20_08_14_51-227797524346310138|
> |Dataflow|Batch|CombiningValue|{color:#de350b}KO{color}|#2|2020-08-20_08_15_46-14394222017890152995|
> |Dataflow|Batch|Bag|{color:#de350b}KO{color}|#2|2020-08-20_08_17_20-2307047231213658649|
> |Dataflow|Streaming|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#3|2020-08-20_08_47_37-6883008099159189108|
> |Dataflow|Streaming|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#3|2020-08-20_08_46_48-7341546514472681857|
> |Dataflow|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
> |Dataflow|Streaming|Bag|{color:#00875a}OK{color}| | |
>
>
>
> Error #1 :
> {noformat}
> Traceback (most recent call last):
> File "test_stateful.py", line 142, in <module>
> run()
> File "test_stateful.py", line 136, in run
> test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
> File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
> self.run().wait_until_finish()
> File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
> return self.runner.run_pipeline(self, self._options)
> File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 179, in run_via_runner_api
> self._check_requirements(pipeline_proto)
> File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 280, in _check_requirements
> raise NotImplementedError(timer.time_domain)
> NotImplementedError: 2{noformat}
> Error #2 :
> {noformat}
> Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.{noformat}
> Error #3 :
> {noformat}
> rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' {noformat}
>
> Pipeline Code :
> {code:python}
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from time import time
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from apache_beam.options.pipeline_options import StandardOptions
> from apache_beam.options.pipeline_options import GoogleCloudOptions
> import apache_beam.coders as coders
> import apache_beam.transforms.userstate as user_state
> from apache_beam.transforms.timeutil import TimeDomain
> from apache_beam.transforms.combiners import CountCombineFn
> import google.auth
> class WordExtractingDoFn(beam.DoFn):
> def process(self, element):
> return re.findall(r'[\w\']+', element, re.UNICODE)
> class TestStatefulTimerRealTime(beam.DoFn):
> STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.REAL_TIME)
> def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
> logging.info('Process Timer RealTime')
> stale_timer.set(time()+1)
> @user_state.on_timer(STALE_TIMER)
> def stale(self):
> logging.info('OK Timer RealTime')
> yield 1
> class TestStatefulTimerWatermark(beam.DoFn):
> STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)
> def process(self, word, w=beam.DoFn.WindowParam, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
> logging.info('Process Timer Watermark')
> stale_timer.set(w.end)
> @user_state.on_timer(STALE_TIMER)
> def stale(self):
> logging.info('OK Timer Watermark')
> yield 1
> class TestStatefulCombiningValue(beam.DoFn):
> COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(), CountCombineFn())
> def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
> logging.info('Process Combining Value : %s' % count_state.read())
> count_state.add(1)
> class TestStatefulBag(beam.DoFn):
> BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())
> def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
> logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
> bag_state.add(word[0])
> def run(argv=None, save_main_session=True):
> input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
> input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--stream_mode',
> dest='stream_mode',
> default='false',
> help='is streamming mode')
> parser.add_argument(
> '--timer_realtime',
> dest='timer_realtime',
> default='false',
> help='Test Stateful Timer; RealTime Domain')
> parser.add_argument(
> '--timer_watermark',
> dest='timer_watermark',
> default='false',
> help='Test Stateful Timer; Watermark Domain')
> parser.add_argument(
> '--combining_value',
> dest='combining_value',
> default='false',
> help='Test Stateful Combining Value')
> parser.add_argument(
> '--bag',
> dest='bag',
> default='false',
> help='Test Stateful Bag')
> known_args, pipeline_args = parser.parse_known_args(argv)
> is_streaming=True if known_args.stream_mode != 'false' else False
> pipeline_options = PipelineOptions(pipeline_args)
> _, pipeline_options.view_as(GoogleCloudOptions).project = google.auth.default()
> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
> pipeline_options.view_as(StandardOptions).streaming = is_streaming
> # The pipeline will be run on exiting the with block.
> with beam.Pipeline(options=pipeline_options) as p:
> if(is_streaming):
> words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
> #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
> else:
> words=(
> p
> | 'Read' >> ReadFromText(input_file)
> | 'Split' >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
> )
> # Set key
> words=words | 'SetKey' >> beam.Map(lambda word:(1, words))
> # TESTS
> if known_args.timer_realtime == 'true':
> test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())
> if known_args.timer_watermark == 'true':
> test_timer_watermark = (words
> | "window" >> beam.WindowInto(beam.window.FixedWindows(1))
> | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))
> if known_args.combining_value == 'true':
> test_combining_value = words | 'Test combining value' >> beam.ParDo(TestStatefulCombiningValue())
> if known_args.bag == 'true':
> test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
> if __name__ == '__main__':
> logging.getLogger().setLevel(logging.DEBUG)
> run()
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)