You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/24 09:39:19 UTC

[GitHub] [beam] DLumi opened a new issue, #22039: [Bug]: unable to set up multiprocessing

DLumi opened a new issue, #22039:
URL: https://github.com/apache/beam/issues/22039

   ### What happened?
   
   Beam - 2.39.0
   Python - 3.8.13
   Win10
   Runner - Direct
   
   I was following this page for setting some parrallelism (threading in this case):
   https://beam.apache.org/documentation/runners/direct/
   
   However, regardless of how I pass the required parameters to PipelineOptions instance, I get the error below. This error is really obscure, and I have no idea how to fix it (as you might've guessed, googling didn't help much).
   Mind you, I have no issues when those parameters are set to defaults.
   
   ```
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x000001B6F91B8AF0> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x000001B6F91B8C10> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x000001B6F91B9160> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x000001B6F91B91F0> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x000001B6F91B93A0> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x000001B6F91B9430> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x000001B6F91B9550> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x000001B6F91B95E0> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x000001B6F91B9670> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x000001B6F91B9700> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x000001B6F91B9940> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function add_impulse_to_dangling_transforms at 0x000001B6F91B9A60> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x000001B6F91B98B0> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x000001B6F91B99D0> ====================
   INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting control server on port 49986
   INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting data server on port 49987
   INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting state server on port 49988
   INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting logging server on port 49989
   INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedGrpcWorkerHandler object at 0x000001B6FB280820> for environment ref_Environment_default_environment_1 (beam:env:embedded_python_grpc:v1, b'{}')
   INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
   INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:49986.
   INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
   INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
   INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedGrpcWorkerHandler object at 0x000001B6FBCE0D90> for environment ref_Environment_default_environment_1 (beam:env:embedded_python_grpc:v1, b'{}')
   INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
   INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:49986.
   INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
   INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
   INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:49988.
   INFO:apache_beam.runners.worker.sdk_worker:State channel established.
   INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:49987
   INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:49988.
   INFO:apache_beam.runners.worker.sdk_worker:State channel established.
   INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:49987
   INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
   INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
   INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
   INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
   INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
   INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
   INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
   INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
   INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
   INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
   Traceback (most recent call last):
     File "T:/nvme1/akutergin/OCR/beam_pipelines/main.py", line 132, in <module>
       ocrd = p_markup.run(decoded, models_and_libs, templates)
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\pipeline.py", line 596, in __exit__
       self.result = self.run()
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\pipeline.py", line 573, in run
       return self.runner.run_pipeline(self, self._options)
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 131, in run_pipeline
       return runner.run_pipeline(pipeline, options)
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 199, in run_pipeline
       self._latest_run_result = self.run_via_runner_api(
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 208, in run_via_runner_api
       return self.run_stages(stage_context, stages)
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 408, in run_stages
       bundle_results = self._execute_bundle(
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 736, in _execute_bundle
       self._run_bundle(
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 965, in _run_bundle
       result, splits = bundle_manager.process_bundle(
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 1388, in process_bundle
       process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\metrics\monitoring_infos.py", line 421, in consolidate
       yield reduce(merge, values)
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\metrics\monitoring_infos.py", line 419, in merge
       payload=combiner(a.payload, b.payload))
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\metrics\monitoring_infos.py", line 384, in distribution_payload_combiner
       (count_a, sum_a, min_a, max_a) = _decode_distribution(coder, payload_a)
     File "C:\Anaconda3\envs\ML-5\lib\site-packages\apache_beam\metrics\monitoring_infos.py", line 451, in _decode_distribution
       value_coder.decode_from_stream(stream, True),
     File "apache_beam\coders\coder_impl.py", line 914, in apache_beam.coders.coder_impl.VarIntCoderImpl.decode_from_stream
     File "apache_beam\coders\coder_impl.py", line 916, in apache_beam.coders.coder_impl.VarIntCoderImpl.decode_from_stream
     File "apache_beam\coders\stream.pyx", line 195, in apache_beam.coders.stream.InputStream.read_var_int64
   RuntimeError: VarLong too long.
   ```
   
   
   
   ### Issue Priority
   
   Priority: 0
   
   ### Issue Component
   
   Component: runner-direct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] DLumi commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
DLumi commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1165401653

   The Exception triggers on this snippet:
   ```
   import apache_beam as beam
   from apache_beam.runners.direct.direct_runner import DirectRunner
   from apache_beam.options.pipeline_options import DirectOptions, PipelineOptions
   
   
   if __name__ == '__main__':
       pipeline_options = PipelineOptions()
   
       runner = DirectRunner()
       
       # Tried passing those to PipelineOptions as well, same result
       pipeline_options.view_as(DirectOptions).direct_running_mode = 'multi_threading'
       pipeline_options.view_as(DirectOptions).direct_num_workers = 0
   
       with beam.Pipeline(runner=runner, options=pipeline_options) as p:
           inputs = p | 'Create' >> beam.Create([{'foo': 'bar'}, {'spam': 'eggs'}])```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] DLumi commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
DLumi commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1168113739

   > (see https://beam.apache.org/contribute/issue-priorities/ for P0 vs P2 meaning)
   
   Well, I'm sorry for mislabeling, and thank you for the correction, but honestly, I'd rather have an actual answer to my question.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1170061896

   Can reproduce on windows 10 using `pip install apache_beam==2.40.0`. But if install using the source code (e.g. `pip install .[gcp,test]` in beam\sdks\python). The problem does not occur.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn closed issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
tvalentyn closed issue #22039: [Bug]: unable to set up multiprocessing
URL: https://github.com/apache/beam/issues/22039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1214586504

   Tested that `--direct_running_mode=multi_threading` now works for beam v2.14.0rc1, but `--direct_running_mode=multi_processing` still not working:
   ```
   Exception in thread run_worker:
   Traceback (most recent call last):
     File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner
       self.run()
     File "C:\Python38\lib\threading.py", line 870, in run
       self._target(*self._args, **self._kwargs)
     File "F:\Py\beampy38\lib\site-packages\apache_beam\runners\portability\local_job_service.py", line 220, in run
       p = subprocess.Popen(self._worker_command_line, shell=True, env=env_dict)
     File "C:\Python38\lib\subprocess.py", line 858, in __init__
       self._execute_child(args, executable, preexec_fn, close_fds,
     File "C:\Python38\lib\subprocess.py", line 1243, in _execute_child
       raise TypeError('bytes args is not allowed on Windows')
   TypeError: bytes args is not allowed on Windows
   ```
   Keep the issue open.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1167732167

   (see https://beam.apache.org/contribute/issue-priorities/ for P0 vs P2 meaning)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1170075363

   duplicates #20633


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #22039: [Bug]: unable to set up multiprocessing

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #22039:
URL: https://github.com/apache/beam/issues/22039#issuecomment-1170537744

   The PyPI releases come with cythonized modules. The issue is fixed in #22107 and should be available in next release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org