You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 18:03:26 UTC

[GitHub] [beam] damccorm opened a new issue, #20520: Stateful DoFn with Python sdk and DataFlow runner

damccorm opened a new issue, #20520:
URL: https://github.com/apache/beam/issues/20520

   Hello,
   
   This is related to : https://issues.apache.org/jira/browse/BEAM-9655
   
    
   
   We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.
   
    
   
   To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).
   
    
   
   Results :
   ||Runner||Mode||StateSpec||Result||Error||JobId||
   |Direct|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#1| |
   |Direct|Batch|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
   |Direct|Batch|CombiningValue|{color:#00875a}OK{color}| | |
   |Direct|Batch|Bag|{color:#00875a}OK{color}| | |
   |Direct|Streaming|Timer; TimeDomain.REALTIME|{color:#00875a}OK{color}| | |
   |Direct|Streaming|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
   |Direct|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
   |Direct|Streaming|Bag|{color:#00875a}OK{color}| | |
   |Dataflow|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#2|2020-08-20_08_14_07-5985905092341835149|
   |Dataflow|Batch|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#2|2020-08-20_08_14_51-227797524346310138|
   |Dataflow|Batch|CombiningValue|{color:#de350b}KO{color}|#2|2020-08-20_08_15_46-14394222017890152995|
   |Dataflow|Batch|Bag|{color:#de350b}KO{color}|#2|2020-08-20_08_17_20-2307047231213658649|
   |Dataflow|Streaming|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#3|2020-08-20_08_47_37-6883008099159189108|
   |Dataflow|Streaming|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#3|2020-08-20_08_46_48-7341546514472681857|
   |Dataflow|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
   |Dataflow|Streaming|Bag|{color:#00875a}OK{color}| | |
   
    
   
    
   
    
   
   Error #1 :
   ```
   
   Traceback (most recent call last):
     File "test_stateful.py", line 142, in <module>
       run()
    
   File "test_stateful.py", line 136, in run
       test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
   
    File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 555, in __exit__
       self.run().wait_until_finish()
     File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py",
   line 534, in run
       return self.runner.run_pipeline(self, self._options)
     File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 119, in run_pipeline
       return runner.run_pipeline(pipeline, options)
     File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
   line 173, in run_pipeline
       pipeline.to_runner_api(default_environment=self._default_environment))
   
    File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
   line 179, in run_via_runner_api
       self._check_requirements(pipeline_proto)
     File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
   line 280, in _check_requirements
       raise NotImplementedError(timer.time_domain)
   NotImplementedError:
   2
   ```
   
   Error #2 :
   ```
   
   Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py",
   line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py",
   line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start
   File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start
   File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start
   File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start
   File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start
   File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
   File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup
   File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception:
   Requested execution of a stateful DoFn, but no user state context is available. This likely means that
   the current runner does not support the execution of stateful DoFns.
   ```
   
   Error #3 :
   ```
   
   rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
   received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError:
   'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
   org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
   org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
   org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK
   harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError:
   'NoneType' object has no attribute 'output_timer_stream' 
   ```
   
    
   
   Pipeline Code :
   
   ```
   
   
   
   from __future__ import absolute_import
   
   import argparse
   import logging
   import re
   
   from time
   import time
   
   from past.builtins import unicode
   
   import apache_beam as beam
   from apache_beam.io
   import ReadFromText
   from apache_beam.io import WriteToText
   from apache_beam.options.pipeline_options
   import PipelineOptions
   from apache_beam.options.pipeline_options import SetupOptions
   from apache_beam.options.pipeline_options
   import StandardOptions
   from apache_beam.options.pipeline_options import GoogleCloudOptions
   import
   apache_beam.coders as coders
   import apache_beam.transforms.userstate as user_state
   from apache_beam.transforms.timeutil
   import TimeDomain
   from apache_beam.transforms.combiners import CountCombineFn
   import google.auth
   
   class
   WordExtractingDoFn(beam.DoFn):
     def process(self, element):
       return re.findall(r'[\w\']+', element,
   re.UNICODE)
   
   class TestStatefulTimerRealTime(beam.DoFn):
     STALE_TIMER = user_state.TimerSpec('stale',
   TimeDomain.REAL_TIME)
   
     def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
   
      logging.info('Process Timer RealTime')
       stale_timer.set(time()+1)
   
     @user_state.on_timer(STALE_TIMER)
   
    def stale(self):
       logging.info('OK Timer RealTime')
       yield 1
   
   class TestStatefulTimerWatermark(beam.DoFn):
   
    STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)
   
     def process(self, word, w=beam.DoFn.WindowParam,
   stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
       logging.info('Process Timer Watermark')
       stale_timer.set(w.end)
   
   
    @user_state.on_timer(STALE_TIMER)
     def stale(self):
       logging.info('OK Timer Watermark')
      
   yield 1
   
   class TestStatefulCombiningValue(beam.DoFn):
     COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(),
   CountCombineFn())
   
     def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
       logging.info('Process
   Combining Value : %s' % count_state.read())
       count_state.add(1)
   
   class TestStatefulBag(beam.DoFn):
   
    BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())
   
     def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
   
      logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
       bag_state.add(word[0])
   
   def
   run(argv=None, save_main_session=True):
     input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
   
    input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'
   
     parser = argparse.ArgumentParser()
   
    parser.add_argument(
         '--stream_mode',
         dest='stream_mode',
         default='false',
    
       help='is streamming mode')
     parser.add_argument(
         '--timer_realtime',
         dest='timer_realtime',
   
        default='false',
         help='Test Stateful Timer; RealTime Domain')
     parser.add_argument(
   
        '--timer_watermark',
         dest='timer_watermark',
         default='false',
         help='Test Stateful
   Timer; Watermark Domain')
     parser.add_argument(
         '--combining_value',
         dest='combining_value',
   
        default='false',
         help='Test Stateful Combining Value')
     parser.add_argument(
         '--bag',
   
        dest='bag',
         default='false',
         help='Test Stateful Bag')
     known_args, pipeline_args
   = parser.parse_known_args(argv)
   
     is_streaming=True if known_args.stream_mode != 'false' else False
   
   
    pipeline_options = PipelineOptions(pipeline_args)
     _, pipeline_options.view_as(GoogleCloudOptions).project
   = google.auth.default()
     pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   
    pipeline_options.view_as(StandardOptions).streaming = is_streaming
   
     # The pipeline will be run
   on exiting the with block.
     with beam.Pipeline(options=pipeline_options) as p:
       if(is_streaming):
   
        words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
         #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
   
      else:
         words=(
           p
           | 'Read' >> ReadFromText(input_file)
           | 'Split'
   >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
           )
   
       # Set key
       words=words
   | 'SetKey' >> beam.Map(lambda word:(1, words))
   
   
       # TESTS
       if known_args.timer_realtime ==
   'true':
         test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())
   
   
      if known_args.timer_watermark == 'true':
         test_timer_watermark = (words 
           | "window"
   >> beam.WindowInto(beam.window.FixedWindows(1)) 
           | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))
   
   
      if known_args.combining_value == 'true':
         test_combining_value = words | 'Test combining value'
   >> beam.ParDo(TestStatefulCombiningValue())
   
       if known_args.bag == 'true':
         test_bag = words
   | 'Test Bag' >> beam.ParDo(TestStatefulBag())
   
   
   
   if __name__ == '__main__':
     logging.getLogger().setLevel(logging.DEBUG)
   
    run()
   
   
   ```
   
    
   
    
   
   Imported from Jira [BEAM-10786](https://issues.apache.org/jira/browse/BEAM-10786). Original Jira may contain additional context.
   Reported by: dstackowiak.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org