You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Michael Stauffer (Jira)" <ji...@apache.org> on 2020/10/07 16:09:00 UTC

[jira] [Updated] (BEAM-11037) Python DirectRunner InputStream Issues (RuntimeError: VarLong too long)

     [ https://issues.apache.org/jira/browse/BEAM-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Stauffer updated BEAM-11037:
------------------------------------
    Description: 
I have a Beam workflow that runs fine on Dataflow. However, when I run it on my local direct runner (Windows 10, Python SDK), I run into memory (or some other issues). Unless I massively reduce the size of the input CSV-files to a few KB. 

I'm not quite sure if it's just a memory issue, as the size of the worker node is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is hard coded so far, see also [here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
{code:java}
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
{code}
Or if the there is an issue in the implementation of InputStream (see also below):

[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]

Any help is highly appreciated.

Exception:
{code:java}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
Traceback (most recent call last):
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 555, in __exit__
    self.result = self.run()
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 534, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 344, in run_stages
    bundle_context_manager,
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 523, in _run_stage
    bundle_manager)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 561, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 945, in process_bundle
    timer_inputs)):
  File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 598, in result_iterator
    yield fs.pop().result()
  File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 435, in result
    return self.__get_result()
  File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 384, in __get_result
    raise self._exception
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 941, in execute
    dry_run)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 841, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 483, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 518, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 983, in process_bundle
    element.data)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 218, in process_encoded
    input_stream, True)
  File "apache_beam\coders\coder_impl.py", line 1246, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 1265, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 858, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 1043, in apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 1359, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 224, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
  File "apache_beam\coders\coder_impl.py", line 470, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 450, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
  File "apache_beam\coders\stream.pyx", line 193, in apache_beam.coders.stream.InputStream.read_var_int64
RuntimeError: VarLong too long.
{code}
 

 

  was:
I have a Beam workflow that runs fine on Dataflow. However, when I run it on my local direct runner (Windows 10, Python SDK), I run into memory (or some other issues). Unless I massively reduce the size of the input CSV-files to a few KB. 

I'm not quite sure if it's just a memory issue, as the size of the worker node is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is hard coded so far, see also [here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
{code:java}
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
{code}
Or if the there is an issue in the implementation of InputStream (see also below):

[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]

Any help is highly appreciated.

Exception:

 
{code:java}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
Traceback (most recent call last):
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 555, in __exit__
    self.result = self.run()
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 534, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 344, in run_stages
    bundle_context_manager,
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 523, in _run_stage
    bundle_manager)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 561, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 945, in process_bundle
    timer_inputs)):
  File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 598, in result_iterator
    yield fs.pop().result()
  File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 435, in result
    return self.__get_result()
  File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 384, in __get_result
    raise self._exception
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 941, in execute
    dry_run)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 841, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 483, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 518, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 983, in process_bundle
    element.data)
  File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 218, in process_encoded
    input_stream, True)
  File "apache_beam\coders\coder_impl.py", line 1246, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 1265, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 858, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 1043, in apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 1359, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 224, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
  File "apache_beam\coders\coder_impl.py", line 470, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
  File "apache_beam\coders\coder_impl.py", line 450, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
  File "apache_beam\coders\stream.pyx", line 193, in apache_beam.coders.stream.InputStream.read_var_int64
RuntimeError: VarLong too long.
{code}
 

 


> Python DirectRunner InputStream Issues (RuntimeError: VarLong too long)
> -----------------------------------------------------------------------
>
>                 Key: BEAM-11037
>                 URL: https://issues.apache.org/jira/browse/BEAM-11037
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-avro, io-py-files, runner-direct
>    Affects Versions: 2.24.0
>         Environment: Windows 10 64bit, Python 3.7.9
>            Reporter: Michael Stauffer
>            Priority: P0
>
> I have a Beam workflow that runs fine on Dataflow. However, when I run it on my local direct runner (Windows 10, Python SDK), I run into memory (or some other issues). Unless I massively reduce the size of the input CSV-files to a few KB. 
> I'm not quite sure if it's just a memory issue, as the size of the worker node is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is hard coded so far, see also [here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
> {code:java}
> INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
> {code}
> Or if the there is an issue in the implementation of InputStream (see also below):
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]
> Any help is highly appreciated.
> Exception:
> {code:java}
> INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
> Traceback (most recent call last):
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 555, in __exit__
>     self.result = self.run()
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 534, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 119, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 176, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 186, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 344, in run_stages
>     bundle_context_manager,
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 523, in _run_stage
>     bundle_manager)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 561, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 945, in process_bundle
>     timer_inputs)):
>   File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 598, in result_iterator
>     yield fs.pop().result()
>   File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 435, in result
>     return self.__get_result()
>   File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 384, in __get_result
>     raise self._exception
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 44, in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 941, in execute
>     dry_run)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 841, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 353, in push
>     response = self.worker.do_instruction(request)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 483, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 518, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 983, in process_bundle
>     element.data)
>   File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 218, in process_encoded
>     input_stream, True)
>   File "apache_beam\coders\coder_impl.py", line 1246, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
>   File "apache_beam\coders\coder_impl.py", line 1265, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
>   File "apache_beam\coders\coder_impl.py", line 858, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
>   File "apache_beam\coders\coder_impl.py", line 1043, in apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
>   File "apache_beam\coders\coder_impl.py", line 1359, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
>   File "apache_beam\coders\coder_impl.py", line 224, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
>   File "apache_beam\coders\coder_impl.py", line 470, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
>   File "apache_beam\coders\coder_impl.py", line 450, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
>   File "apache_beam\coders\stream.pyx", line 193, in apache_beam.coders.stream.InputStream.read_var_int64
> RuntimeError: VarLong too long.
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)