You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/20 17:26:00 UTC

[jira] [Commented] (BEAM-10786) Stateful DoFn with Python sdk and DataFlow runner

    [ https://issues.apache.org/jira/browse/BEAM-10786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509493#comment-17509493 ] 

Beam JIRA Bot commented on BEAM-10786:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Stateful DoFn with Python sdk and DataFlow runner
> -------------------------------------------------
>
>                 Key: BEAM-10786
>                 URL: https://issues.apache.org/jira/browse/BEAM-10786
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-dataflow, runner-direct, sdk-py-core
>            Reporter: STACKOWIAK Denis
>            Priority: P2
>              Labels: stale-P2
>
> Hello,
> This is related to : https://issues.apache.org/jira/browse/BEAM-9655
>  
> We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.
>  
> To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).
>  
> Results :
> ||Runner||Mode||StateSpec||Result||Error||JobId||
> |Direct|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#1| |
> |Direct|Batch|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
> |Direct|Batch|CombiningValue|{color:#00875a}OK{color}| | |
> |Direct|Batch|Bag|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Timer; TimeDomain.REALTIME|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
> |Direct|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Bag|{color:#00875a}OK{color}| | |
> |Dataflow|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#2|2020-08-20_08_14_07-5985905092341835149|
> |Dataflow|Batch|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#2|2020-08-20_08_14_51-227797524346310138|
> |Dataflow|Batch|CombiningValue|{color:#de350b}KO{color}|#2|2020-08-20_08_15_46-14394222017890152995|
> |Dataflow|Batch|Bag|{color:#de350b}KO{color}|#2|2020-08-20_08_17_20-2307047231213658649|
> |Dataflow|Streaming|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#3|2020-08-20_08_47_37-6883008099159189108|
> |Dataflow|Streaming|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#3|2020-08-20_08_46_48-7341546514472681857|
> |Dataflow|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
> |Dataflow|Streaming|Bag|{color:#00875a}OK{color}| | |
>  
>  
>  
> Error #1 :
> {noformat}
> Traceback (most recent call last):
>   File "test_stateful.py", line 142, in <module>
>     run()
>   File "test_stateful.py", line 136, in run
>     test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
>     self.run().wait_until_finish()
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 179, in run_via_runner_api
>     self._check_requirements(pipeline_proto)
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 280, in _check_requirements
>     raise NotImplementedError(timer.time_domain)
> NotImplementedError: 2{noformat}
> Error #2 :
> {noformat}
> Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.{noformat}
> Error #3 :
> {noformat}
> rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' {noformat}
>  
> Pipeline Code :
> {code:python}
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from time import time
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from apache_beam.options.pipeline_options import StandardOptions
> from apache_beam.options.pipeline_options import GoogleCloudOptions
> import apache_beam.coders as coders
> import apache_beam.transforms.userstate as user_state
> from apache_beam.transforms.timeutil import TimeDomain
> from apache_beam.transforms.combiners import CountCombineFn
> import google.auth
> class WordExtractingDoFn(beam.DoFn):
>   def process(self, element):
>     return re.findall(r'[\w\']+', element, re.UNICODE)
> class TestStatefulTimerRealTime(beam.DoFn):
>   STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.REAL_TIME)
>   def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
>     logging.info('Process Timer RealTime')
>     stale_timer.set(time()+1)
>   @user_state.on_timer(STALE_TIMER)
>   def stale(self):
>     logging.info('OK Timer RealTime')
>     yield 1
> class TestStatefulTimerWatermark(beam.DoFn):
>   STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)
>   def process(self, word, w=beam.DoFn.WindowParam, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
>     logging.info('Process Timer Watermark')
>     stale_timer.set(w.end)
>   @user_state.on_timer(STALE_TIMER)
>   def stale(self):
>     logging.info('OK Timer Watermark')
>     yield 1
> class TestStatefulCombiningValue(beam.DoFn):
>   COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(), CountCombineFn())
>   def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
>     logging.info('Process Combining Value : %s' % count_state.read())
>     count_state.add(1)
> class TestStatefulBag(beam.DoFn):
>   BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())
>   def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
>     logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
>     bag_state.add(word[0])
> def run(argv=None, save_main_session=True):
>   input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
>   input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'
>   parser = argparse.ArgumentParser()
>   parser.add_argument(
>       '--stream_mode',
>       dest='stream_mode',
>       default='false',
>       help='is streamming mode')
>   parser.add_argument(
>       '--timer_realtime',
>       dest='timer_realtime',
>       default='false',
>       help='Test Stateful Timer; RealTime Domain')
>   parser.add_argument(
>       '--timer_watermark',
>       dest='timer_watermark',
>       default='false',
>       help='Test Stateful Timer; Watermark Domain')
>   parser.add_argument(
>       '--combining_value',
>       dest='combining_value',
>       default='false',
>       help='Test Stateful Combining Value')
>   parser.add_argument(
>       '--bag',
>       dest='bag',
>       default='false',
>       help='Test Stateful Bag')
>   known_args, pipeline_args = parser.parse_known_args(argv)
>   is_streaming=True if known_args.stream_mode != 'false' else False
>   pipeline_options = PipelineOptions(pipeline_args)
>   _, pipeline_options.view_as(GoogleCloudOptions).project = google.auth.default()
>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>   pipeline_options.view_as(StandardOptions).streaming = is_streaming
>   # The pipeline will be run on exiting the with block.
>   with beam.Pipeline(options=pipeline_options) as p:
>     if(is_streaming):
>       words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
>       #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
>     else:
>       words=(
>         p
>         | 'Read' >> ReadFromText(input_file)
>         | 'Split' >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
>         )
>     # Set key
>     words=words | 'SetKey' >> beam.Map(lambda word:(1, words))
>     # TESTS
>     if known_args.timer_realtime == 'true':
>       test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())
>     if known_args.timer_watermark == 'true':
>       test_timer_watermark = (words 
>         | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
>         | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))
>     if known_args.combining_value == 'true':
>       test_combining_value = words | 'Test combining value' >> beam.ParDo(TestStatefulCombiningValue())
>     if known_args.bag == 'true':
>       test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.DEBUG)
>   run()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)