You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Michael Stauffer (Jira)" <ji...@apache.org> on 2020/10/07 16:09:00 UTC
[jira] [Updated] (BEAM-11037) Python DirectRunner InputStream
Issues (RuntimeError: VarLong too long)
[ https://issues.apache.org/jira/browse/BEAM-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Stauffer updated BEAM-11037:
------------------------------------
Description:
I have a Beam workflow that runs fine on Dataflow. However, when I run it on my local direct runner (Windows 10, Python SDK), I run into memory (or some other issues). Unless I massively reduce the size of the input CSV-files to a few KB.
I'm not quite sure if it's just a memory issue, as the size of the worker node is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is hard coded so far, see also [here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
{code:java}
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
{code}
Or if the there is an issue in the implementation of InputStream (see also below):
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]
Any help is highly appreciated.
Exception:
{code:java}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
Traceback (most recent call last):
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 555, in __exit__
self.result = self.run()
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 534, in run
return self.runner.run_pipeline(self, self._options)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 119, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 176, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 186, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 344, in run_stages
bundle_context_manager,
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 523, in _run_stage
bundle_manager)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 561, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 945, in process_bundle
timer_inputs)):
File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 598, in result_iterator
yield fs.pop().result()
File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 435, in result
return self.__get_result()
File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 384, in __get_result
raise self._exception
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 941, in execute
dry_run)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 841, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 353, in push
response = self.worker.do_instruction(request)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 483, in do_instruction
getattr(request, request_type), request.instruction_id)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 518, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 983, in process_bundle
element.data)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 218, in process_encoded
input_stream, True)
File "apache_beam\coders\coder_impl.py", line 1246, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1265, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 858, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1043, in apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1359, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 224, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam\coders\coder_impl.py", line 470, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 450, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\stream.pyx", line 193, in apache_beam.coders.stream.InputStream.read_var_int64
RuntimeError: VarLong too long.
{code}
was:
I have a Beam workflow that runs fine on Dataflow. However, when I run it on my local direct runner (Windows 10, Python SDK), I run into memory (or some other issues). Unless I massively reduce the size of the input CSV-files to a few KB.
I'm not quite sure if it's just a memory issue, as the size of the worker node is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is hard coded so far, see also [here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
{code:java}
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
{code}
Or if the there is an issue in the implementation of InputStream (see also below):
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]
Any help is highly appreciated.
Exception:
{code:java}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
Traceback (most recent call last):
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 555, in __exit__
self.result = self.run()
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 534, in run
return self.runner.run_pipeline(self, self._options)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 119, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 176, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 186, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 344, in run_stages
bundle_context_manager,
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 523, in _run_stage
bundle_manager)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 561, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 945, in process_bundle
timer_inputs)):
File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 598, in result_iterator
yield fs.pop().result()
File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 435, in result
return self.__get_result()
File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 384, in __get_result
raise self._exception
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 941, in execute
dry_run)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 841, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 353, in push
response = self.worker.do_instruction(request)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 483, in do_instruction
getattr(request, request_type), request.instruction_id)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 518, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 983, in process_bundle
element.data)
File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 218, in process_encoded
input_stream, True)
File "apache_beam\coders\coder_impl.py", line 1246, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1265, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 858, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1043, in apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1359, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 224, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam\coders\coder_impl.py", line 470, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 450, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\stream.pyx", line 193, in apache_beam.coders.stream.InputStream.read_var_int64
RuntimeError: VarLong too long.
{code}
> Python DirectRunner InputStream Issues (RuntimeError: VarLong too long)
> -----------------------------------------------------------------------
>
> Key: BEAM-11037
> URL: https://issues.apache.org/jira/browse/BEAM-11037
> Project: Beam
> Issue Type: Bug
> Components: io-py-avro, io-py-files, runner-direct
> Affects Versions: 2.24.0
> Environment: Windows 10 64bit, Python 3.7.9
> Reporter: Michael Stauffer
> Priority: P0
>
> I have a Beam workflow that runs fine on Dataflow. However, when I run it on my local direct runner (Windows 10, Python SDK), I run into memory (or some other issues). Unless I massively reduce the size of the input CSV-files to a few KB.
> I'm not quite sure if it's just a memory issue, as the size of the worker node is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is hard coded so far, see also [here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
> {code:java}
> INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
> {code}
> Or if the there is an issue in the implementation of InputStream (see also below):
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]
> Any help is highly appreciated.
> Exception:
> {code:java}
> INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
> Traceback (most recent call last):
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 555, in __exit__
> self.result = self.run()
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py", line 534, in run
> return self.runner.run_pipeline(self, self._options)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 119, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 176, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 186, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 344, in run_stages
> bundle_context_manager,
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 523, in _run_stage
> bundle_manager)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 561, in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 945, in process_bundle
> timer_inputs)):
> File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 598, in result_iterator
> yield fs.pop().result()
> File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 435, in result
> return self.__get_result()
> File "c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py", line 384, in __get_result
> raise self._exception
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 44, in run
> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 941, in execute
> dry_run)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 841, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 353, in push
> response = self.worker.do_instruction(request)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 483, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 518, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 983, in process_bundle
> element.data)
> File "C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 218, in process_encoded
> input_stream, True)
> File "apache_beam\coders\coder_impl.py", line 1246, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 1265, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 858, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 1043, in apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 1359, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 224, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
> File "apache_beam\coders\coder_impl.py", line 470, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 450, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
> File "apache_beam\coders\stream.pyx", line 193, in apache_beam.coders.stream.InputStream.read_var_int64
> RuntimeError: VarLong too long.
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)