You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "STACKOWIAK Denis (Jira)" <ji...@apache.org> on 2020/09/09 07:52:00 UTC

[jira] [Updated] (BEAM-10786) Stateful DoFn with Python sdk and DataFlow runner

     [ https://issues.apache.org/jira/browse/BEAM-10786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

STACKOWIAK Denis updated BEAM-10786:
------------------------------------
    Description: 
Hello,

This is related to : https://issues.apache.org/jira/browse/BEAM-9655

 

We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.

 

To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).

 

Results :
||Runner||Mode||StateSpec||Result||Error||JobId||
|Direct|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#1| |
|Direct|Batch|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
|Direct|Batch|CombiningValue|{color:#00875a}OK{color}| | |
|Direct|Batch|Bag|{color:#00875a}OK{color}| | |
|Direct|Streaming|Timer; TimeDomain.REALTIME|{color:#00875a}OK{color}| | |
|Direct|Streaming|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
|Direct|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
|Direct|Streaming|Bag|{color:#00875a}OK{color}| | |
|Dataflow|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#2|2020-08-20_08_14_07-5985905092341835149|
|Dataflow|Batch|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#2|2020-08-20_08_14_51-227797524346310138|
|Dataflow|Batch|CombiningValue|{color:#de350b}KO{color}|#2|2020-08-20_08_15_46-14394222017890152995|
|Dataflow|Batch|Bag|{color:#de350b}KO{color}|#2|2020-08-20_08_17_20-2307047231213658649|
|Dataflow|Streaming|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#3|2020-08-20_08_47_37-6883008099159189108|
|Dataflow|Streaming|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#3|2020-08-20_08_46_48-7341546514472681857|
|Dataflow|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
|Dataflow|Streaming|Bag|{color:#00875a}OK{color}| | |

 

 

 

Error #1 :
{noformat}
Traceback (most recent call last):
  File "test_stateful.py", line 142, in <module>
    run()
  File "test_stateful.py", line 136, in run
    test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
    self.run().wait_until_finish()
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 179, in run_via_runner_api
    self._check_requirements(pipeline_proto)
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 280, in _check_requirements
    raise NotImplementedError(timer.time_domain)
NotImplementedError: 2{noformat}
Error #2 :
{noformat}
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.{noformat}
Error #3 :
{noformat}
rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' {noformat}
 

Pipeline Code :
```
from __future__ import absolute_import

import argparse
import logging
import re

from time import time

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
import apache_beam.coders as coders
import apache_beam.transforms.userstate as user_state
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.combiners import CountCombineFn
import google.auth

class WordExtractingDoFn(beam.DoFn):
  def process(self, element):
    return re.findall(r'[\w\']+', element, re.UNICODE)

class TestStatefulTimerRealTime(beam.DoFn):
  STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.REAL_TIME)

  def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
    logging.info('Process Timer RealTime')
    stale_timer.set(time()+1)

  @user_state.on_timer(STALE_TIMER)
  def stale(self):
    logging.info('OK Timer RealTime')
    yield 1

class TestStatefulTimerWatermark(beam.DoFn):
  STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)

  def process(self, word, w=beam.DoFn.WindowParam, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
    logging.info('Process Timer Watermark')
    stale_timer.set(w.end)

  @user_state.on_timer(STALE_TIMER)
  def stale(self):
    logging.info('OK Timer Watermark')
    yield 1

class TestStatefulCombiningValue(beam.DoFn):
  COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(), CountCombineFn())

  def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
    logging.info('Process Combining Value : %s' % count_state.read())
    count_state.add(1)

class TestStatefulBag(beam.DoFn):
  BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())

  def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
    logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
    bag_state.add(word[0])

def run(argv=None, save_main_session=True):
  input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
  input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--stream_mode',
      dest='stream_mode',
      default='false',
      help='is streamming mode')
  parser.add_argument(
      '--timer_realtime',
      dest='timer_realtime',
      default='false',
      help='Test Stateful Timer; RealTime Domain')
  parser.add_argument(
      '--timer_watermark',
      dest='timer_watermark',
      default='false',
      help='Test Stateful Timer; Watermark Domain')
  parser.add_argument(
      '--combining_value',
      dest='combining_value',
      default='false',
      help='Test Stateful Combining Value')
  parser.add_argument(
      '--bag',
      dest='bag',
      default='false',
      help='Test Stateful Bag')
  known_args, pipeline_args = parser.parse_known_args(argv)

  is_streaming=True if known_args.stream_mode != 'false' else False

  pipeline_options = PipelineOptions(pipeline_args)
  _, pipeline_options.view_as(GoogleCloudOptions).project = google.auth.default()
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  pipeline_options.view_as(StandardOptions).streaming = is_streaming

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:
    if(is_streaming):
      words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
      #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
    else:
      words=(
        p
        | 'Read' >> ReadFromText(input_file)
        | 'Split' >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
        )

    # Set key
    words=words | 'SetKey' >> beam.Map(lambda word:(1, words))


    # TESTS
    if known_args.timer_realtime == 'true':
      test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())

    if known_args.timer_watermark == 'true':
      test_timer_watermark = (words 
        | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
        | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))

    if known_args.combining_value == 'true':
      test_combining_value = words | 'Test combining value' >> beam.ParDo(TestStatefulCombiningValue())

    if known_args.bag == 'true':
      test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())



if __name__ == '__main__':
  logging.getLogger().setLevel(logging.DEBUG)
  run()
```
 

 

  was:
Hello,

This is related to : https://issues.apache.org/jira/browse/BEAM-9655

 

We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.

 

To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).

 

Results :
||Runner||Mode||StateSpec||Result||Error||JobId||
|Direct|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#1| |
|Direct|Batch|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
|Direct|Batch|CombiningValue|{color:#00875a}OK{color}| | |
|Direct|Batch|Bag|{color:#00875a}OK{color}| | |
|Direct|Streaming|Timer; TimeDomain.REALTIME|{color:#00875a}OK{color}| | |
|Direct|Streaming|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
|Direct|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
|Direct|Streaming|Bag|{color:#00875a}OK{color}| | |
|Dataflow|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#2|2020-08-20_08_14_07-5985905092341835149|
|Dataflow|Batch|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#2|2020-08-20_08_14_51-227797524346310138|
|Dataflow|Batch|CombiningValue|{color:#de350b}KO{color}|#2|2020-08-20_08_15_46-14394222017890152995|
|Dataflow|Batch|Bag|{color:#de350b}KO{color}|#2|2020-08-20_08_17_20-2307047231213658649|
|Dataflow|Streaming|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#3|2020-08-20_08_47_37-6883008099159189108|
|Dataflow|Streaming|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#3|2020-08-20_08_46_48-7341546514472681857|
|Dataflow|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
|Dataflow|Streaming|Bag|{color:#00875a}OK{color}| | |

 

 

 

Error #1 :
{noformat}
Traceback (most recent call last):
  File "test_stateful.py", line 142, in <module>
    run()
  File "test_stateful.py", line 136, in run
    test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
    self.run().wait_until_finish()
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 179, in run_via_runner_api
    self._check_requirements(pipeline_proto)
  File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 280, in _check_requirements
    raise NotImplementedError(timer.time_domain)
NotImplementedError: 2{noformat}
Error #2 :
{noformat}
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.{noformat}
Error #3 :
{noformat}
rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' {noformat}
 

Pipeline Code :
{code:python}
from __future__ import absolute_import

import argparse
import logging
import re

from time import time

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
import apache_beam.coders as coders
import apache_beam.transforms.userstate as user_state
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.combiners import CountCombineFn
import google.auth

class WordExtractingDoFn(beam.DoFn):
  def process(self, element):
    return re.findall(r'[\w\']+', element, re.UNICODE)

class TestStatefulTimerRealTime(beam.DoFn):
  STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.REAL_TIME)

  def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
    logging.info('Process Timer RealTime')
    stale_timer.set(time()+1)

  @user_state.on_timer(STALE_TIMER)
  def stale(self):
    logging.info('OK Timer RealTime')
    yield 1

class TestStatefulTimerWatermark(beam.DoFn):
  STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)

  def process(self, word, w=beam.DoFn.WindowParam, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
    logging.info('Process Timer Watermark')
    stale_timer.set(w.end)

  @user_state.on_timer(STALE_TIMER)
  def stale(self):
    logging.info('OK Timer Watermark')
    yield 1

class TestStatefulCombiningValue(beam.DoFn):
  COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(), CountCombineFn())

  def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
    logging.info('Process Combining Value : %s' % count_state.read())
    count_state.add(1)

class TestStatefulBag(beam.DoFn):
  BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())

  def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
    logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
    bag_state.add(word[0])

def run(argv=None, save_main_session=True):
  input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
  input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--stream_mode',
      dest='stream_mode',
      default='false',
      help='is streamming mode')
  parser.add_argument(
      '--timer_realtime',
      dest='timer_realtime',
      default='false',
      help='Test Stateful Timer; RealTime Domain')
  parser.add_argument(
      '--timer_watermark',
      dest='timer_watermark',
      default='false',
      help='Test Stateful Timer; Watermark Domain')
  parser.add_argument(
      '--combining_value',
      dest='combining_value',
      default='false',
      help='Test Stateful Combining Value')
  parser.add_argument(
      '--bag',
      dest='bag',
      default='false',
      help='Test Stateful Bag')
  known_args, pipeline_args = parser.parse_known_args(argv)

  is_streaming=True if known_args.stream_mode != 'false' else False

  pipeline_options = PipelineOptions(pipeline_args)
  _, pipeline_options.view_as(GoogleCloudOptions).project = google.auth.default()
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  pipeline_options.view_as(StandardOptions).streaming = is_streaming

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:
    if(is_streaming):
      words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
      #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
    else:
      words=(
        p
        | 'Read' >> ReadFromText(input_file)
        | 'Split' >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
        )

    # Set key
    words=words | 'SetKey' >> beam.Map(lambda word:(1, words))


    # TESTS
    if known_args.timer_realtime == 'true':
      test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())

    if known_args.timer_watermark == 'true':
      test_timer_watermark = (words 
        | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
        | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))

    if known_args.combining_value == 'true':
      test_combining_value = words | 'Test combining value' >> beam.ParDo(TestStatefulCombiningValue())

    if known_args.bag == 'true':
      test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())



if __name__ == '__main__':
  logging.getLogger().setLevel(logging.DEBUG)
  run()


{code}
 

 


> Stateful DoFn with Python sdk and DataFlow runner
> -------------------------------------------------
>
>                 Key: BEAM-10786
>                 URL: https://issues.apache.org/jira/browse/BEAM-10786
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, runner-direct, sdk-py-core
>    Affects Versions: 2.23.0
>            Reporter: STACKOWIAK Denis
>            Priority: P2
>
> Hello,
> This is related to : https://issues.apache.org/jira/browse/BEAM-9655
>  
> We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.
>  
> To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).
>  
> Results :
> ||Runner||Mode||StateSpec||Result||Error||JobId||
> |Direct|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#1| |
> |Direct|Batch|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
> |Direct|Batch|CombiningValue|{color:#00875a}OK{color}| | |
> |Direct|Batch|Bag|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Timer; TimeDomain.REALTIME|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Timer; TimeDomain.WATERMARK|{color:#00875a}OK{color}| | |
> |Direct|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
> |Direct|Streaming|Bag|{color:#00875a}OK{color}| | |
> |Dataflow|Batch|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#2|2020-08-20_08_14_07-5985905092341835149|
> |Dataflow|Batch|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#2|2020-08-20_08_14_51-227797524346310138|
> |Dataflow|Batch|CombiningValue|{color:#de350b}KO{color}|#2|2020-08-20_08_15_46-14394222017890152995|
> |Dataflow|Batch|Bag|{color:#de350b}KO{color}|#2|2020-08-20_08_17_20-2307047231213658649|
> |Dataflow|Streaming|Timer; TimeDomain.REALTIME|{color:#de350b}KO{color}|#3|2020-08-20_08_47_37-6883008099159189108|
> |Dataflow|Streaming|Timer; TimeDomain.WATERMARK|{color:#de350b}KO{color}|#3|2020-08-20_08_46_48-7341546514472681857|
> |Dataflow|Streaming|CombiningValue|{color:#00875a}OK{color}| | |
> |Dataflow|Streaming|Bag|{color:#00875a}OK{color}| | |
>  
>  
>  
> Error #1 :
> {noformat}
> Traceback (most recent call last):
>   File "test_stateful.py", line 142, in <module>
>     run()
>   File "test_stateful.py", line 136, in run
>     test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
>     self.run().wait_until_finish()
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 179, in run_via_runner_api
>     self._check_requirements(pipeline_proto)
>   File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 280, in _check_requirements
>     raise NotImplementedError(timer.time_domain)
> NotImplementedError: 2{noformat}
> Error #2 :
> {noformat}
> Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.{noformat}
> Error #3 :
> {noformat}
> rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' {noformat}
>  
> Pipeline Code :
> ```
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from time import time
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from apache_beam.options.pipeline_options import StandardOptions
> from apache_beam.options.pipeline_options import GoogleCloudOptions
> import apache_beam.coders as coders
> import apache_beam.transforms.userstate as user_state
> from apache_beam.transforms.timeutil import TimeDomain
> from apache_beam.transforms.combiners import CountCombineFn
> import google.auth
> class WordExtractingDoFn(beam.DoFn):
>   def process(self, element):
>     return re.findall(r'[\w\']+', element, re.UNICODE)
> class TestStatefulTimerRealTime(beam.DoFn):
>   STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.REAL_TIME)
>   def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
>     logging.info('Process Timer RealTime')
>     stale_timer.set(time()+1)
>   @user_state.on_timer(STALE_TIMER)
>   def stale(self):
>     logging.info('OK Timer RealTime')
>     yield 1
> class TestStatefulTimerWatermark(beam.DoFn):
>   STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)
>   def process(self, word, w=beam.DoFn.WindowParam, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
>     logging.info('Process Timer Watermark')
>     stale_timer.set(w.end)
>   @user_state.on_timer(STALE_TIMER)
>   def stale(self):
>     logging.info('OK Timer Watermark')
>     yield 1
> class TestStatefulCombiningValue(beam.DoFn):
>   COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(), CountCombineFn())
>   def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
>     logging.info('Process Combining Value : %s' % count_state.read())
>     count_state.add(1)
> class TestStatefulBag(beam.DoFn):
>   BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())
>   def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
>     logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
>     bag_state.add(word[0])
> def run(argv=None, save_main_session=True):
>   input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
>   input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'
>   parser = argparse.ArgumentParser()
>   parser.add_argument(
>       '--stream_mode',
>       dest='stream_mode',
>       default='false',
>       help='is streamming mode')
>   parser.add_argument(
>       '--timer_realtime',
>       dest='timer_realtime',
>       default='false',
>       help='Test Stateful Timer; RealTime Domain')
>   parser.add_argument(
>       '--timer_watermark',
>       dest='timer_watermark',
>       default='false',
>       help='Test Stateful Timer; Watermark Domain')
>   parser.add_argument(
>       '--combining_value',
>       dest='combining_value',
>       default='false',
>       help='Test Stateful Combining Value')
>   parser.add_argument(
>       '--bag',
>       dest='bag',
>       default='false',
>       help='Test Stateful Bag')
>   known_args, pipeline_args = parser.parse_known_args(argv)
>   is_streaming=True if known_args.stream_mode != 'false' else False
>   pipeline_options = PipelineOptions(pipeline_args)
>   _, pipeline_options.view_as(GoogleCloudOptions).project = google.auth.default()
>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>   pipeline_options.view_as(StandardOptions).streaming = is_streaming
>   # The pipeline will be run on exiting the with block.
>   with beam.Pipeline(options=pipeline_options) as p:
>     if(is_streaming):
>       words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
>       #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
>     else:
>       words=(
>         p
>         | 'Read' >> ReadFromText(input_file)
>         | 'Split' >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
>         )
>     # Set key
>     words=words | 'SetKey' >> beam.Map(lambda word:(1, words))
>     # TESTS
>     if known_args.timer_realtime == 'true':
>       test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())
>     if known_args.timer_watermark == 'true':
>       test_timer_watermark = (words 
>         | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
>         | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))
>     if known_args.combining_value == 'true':
>       test_combining_value = words | 'Test combining value' >> beam.ParDo(TestStatefulCombiningValue())
>     if known_args.bag == 'true':
>       test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.DEBUG)
>   run()
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)