You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/13 03:38:00 UTC
[jira] [Updated] (BEAM-9417) Unable to Read form BigQuery and File
system in same pipeline
[ https://issues.apache.org/jira/browse/BEAM-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-9417:
----------------------------------
Priority: P2 (was: P1)
> Unable to Read form BigQuery and File system in same pipeline
> -------------------------------------------------------------
>
> Key: BEAM-9417
> URL: https://issues.apache.org/jira/browse/BEAM-9417
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Environment: macbook pro cataline, python3.7, apache-beam[gcp]===2.19.0
> Reporter: Deepak Verma
> Priority: P2
> Labels: bigquery, multiplexing
>
> I am trying to read from Bigquery and Local file system in my apache beam[gcp] pipeline.
> {code:java}
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
> .format(bq_project=options.bq_project, customer=options.customer)
> file_path = "mycsv.csv.gz"
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
> preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
> {code}
>
> When I am running this job, I am getting below error
>
> {code:java}
> Traceback (most recent call last):
> File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in <module>
> run()
> File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run
> p.run().wait_until_finish()
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 461, in run
> self._options).run(False)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
> return self.runner.run_pipeline(self, self._options)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 182, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 413, in run_pipeline
> pipeline.replace_all(_get_transform_overrides(options))
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 443, in replace_all
> self._replace(override)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 340, in _replace
> self.visit(TransformUpdater(self))
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in visit
> self._root_transform().visit(visitor, self, visited)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
> part.visit(visitor, pipeline, visited)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
> part.visit(visitor, pipeline, visited)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
> part.visit(visitor, pipeline, visited)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 942, in visit
> visitor.visit_transform(self)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 338, in visit_transform
> self._replace_if_needed(transform_node)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 301, in _replace_if_needed
> new_output = replacement_transform.expand(input_node)
> File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 87, in expand
> invoker = DoFnInvoker.create_invoker(signature, process_invocation=False)
> File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.DoFnInvoker.create_invoker
> TypeError: create_invoker() takes at least 2 positional arguments (1 given){code}
>
> But If I run my code like this
> {code:java}
>
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> file_path = "mycsv.csv.gz"
> preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
> {code}
>
> or like this
> {code:java}
>
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
> .format(bq_project=options.bq_project, customer=options.customer)
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
> {code}
>
> or even like this
> {code:java}
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
> .format(bq_project=options.bq_project, customer=options.customer)
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)){code}
> the code just works fine.
>
> Is it a limitation of the apache beam to read from the same source?
> If so, can we add this feature?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)