You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/03/16 00:06:00 UTC

[jira] [Work logged] (BEAM-14112) ReadFromBigQuery cannot be used with the interactive runner

     [ https://issues.apache.org/jira/browse/BEAM-14112?focusedWorklogId=741955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-741955 ]

ASF GitHub Bot logged work on BEAM-14112:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Mar/22 00:05
            Start Date: 16/Mar/22 00:05
    Worklog Time Spent: 10m 
      Work Description: chunyang opened a new pull request #17100:
URL: https://github.com/apache/beam/pull/17100


   **Please** add a meaningful description for your change here
   
   Avoid storing a generator in _CustomBigQuerySource so that it can be used with the Python interactive runner. See details in https://issues.apache.org/jira/browse/BEAM-14112 .
   
   Likely related to #15610
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 741955)
    Remaining Estimate: 0h
            Time Spent: 10m

> ReadFromBigQuery cannot be used with the interactive runner
> -----------------------------------------------------------
>
>                 Key: BEAM-14112
>                 URL: https://issues.apache.org/jira/browse/BEAM-14112
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp, runner-py-interactive
>    Affects Versions: 2.35.0, 2.36.0, 2.37.0
>            Reporter: Chun Yang
>            Priority: P2
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with the Python interactive runner.
> The error can be reproduced with the following code:
> {code:python}#!/usr/bin/env python
> """Reproduce pickle issue when using RFBQ in interactive runner."""
> import apache_beam as beam                                                        
> from apache_beam.runners.interactive.interactive_runner import InteractiveRunner  
> import apache_beam.runners.interactive.interactive_beam as ib                     
>                                                                                   
>                                                                                   
> options = beam.options.pipeline_options.PipelineOptions(                          
>     project="...",                                                
>     temp_location="...",                              
> )                                                                                 
>                                                                                   
> pipeline = beam.Pipeline(InteractiveRunner(), options=options)                    
> pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")                     
> print(ib.collect(pcoll)){code}
> {code:none}Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1514, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 441, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 268, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 802, in <lambda>
>     lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "repro.py", line 16, in <module>
>     print(ib.collect(pcoll))
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py", line 270, in run_within_progress_indicator
>     return func(*args, **kwargs)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py", line 664, in collect
>     recording = recording_manager.record([pcoll], max_n=n, max_duration=duration)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py", line 458, in record
>     self.user_pipeline.options).run()
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py", line 113, in run
>     return self.deduce_fragment().run()
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py", line 573, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py", line 195, in run_pipeline
>     pipeline_to_execute.run(), pipeline_instrument)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py", line 573, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 200, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 210, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 396, in run_stages
>     runner_execution_context, bundle_context_manager)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 667, in _run_stage
>     bundle_manager))
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 784, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1094, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
>     response = self.worker.do_instruction(request)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
>     element.data)
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1265, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1265, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1514, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 441, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 268, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
>   File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 802, in <lambda>
>     lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects [while running 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}
> I suspect the error is caused by this change that was first released in 2.35.0: https://github.com/apache/beam/pull/15610



--
This message was sent by Atlassian Jira
(v8.20.1#820001)