You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2020/05/15 23:28:00 UTC

[jira] [Commented] (BEAM-9417) Unable to Read form BigQuery and File system in same pipeline

    [ https://issues.apache.org/jira/browse/BEAM-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108748#comment-17108748 ] 

Kenneth Knowles commented on BEAM-9417:
---------------------------------------

[~chamikara] [~pabloem] [~robertwb]

> Unable to Read form BigQuery and File system in same pipeline
> -------------------------------------------------------------
>
>                 Key: BEAM-9417
>                 URL: https://issues.apache.org/jira/browse/BEAM-9417
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>         Environment: macbook pro cataline, python3.7, apache-beam[gcp]===2.19.0
>            Reporter: Deepak Verma
>            Priority: Critical
>              Labels: bigquery, multiplexing
>
> I am trying to read from Bigquery and Local file system in my apache beam[gcp] pipeline.
> {code:java}
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
>  .format(bq_project=options.bq_project, customer=options.customer)
> file_path = "mycsv.csv.gz"
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
> preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
> {code}
>  
> When I am running this job, I am getting below error
>  
> {code:java}
> Traceback (most recent call last):
>  File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in <module>
>  run()
>  File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run
>  p.run().wait_until_finish()
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 461, in run
>  self._options).run(False)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
>  return self.runner.run_pipeline(self, self._options)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 182, in run_pipeline
>  return runner.run_pipeline(pipeline, options)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 413, in run_pipeline
>  pipeline.replace_all(_get_transform_overrides(options))
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 443, in replace_all
>  self._replace(override)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 340, in _replace
>  self.visit(TransformUpdater(self))
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in visit
>  self._root_transform().visit(visitor, self, visited)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
>  part.visit(visitor, pipeline, visited)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
>  part.visit(visitor, pipeline, visited)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
>  part.visit(visitor, pipeline, visited)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 942, in visit
>  visitor.visit_transform(self)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 338, in visit_transform
>  self._replace_if_needed(transform_node)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 301, in _replace_if_needed
>  new_output = replacement_transform.expand(input_node)
>  File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 87, in expand
>  invoker = DoFnInvoker.create_invoker(signature, process_invocation=False)
>  File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.DoFnInvoker.create_invoker
> TypeError: create_invoker() takes at least 2 positional arguments (1 given){code}
>  
> But If I run my code like this
> {code:java}
>  
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> file_path = "mycsv.csv.gz"
> preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
> {code}
>  
> or like this
> {code:java}
>  
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
>  .format(bq_project=options.bq_project, customer=options.customer)
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
> {code}
>  
> or even like this 
> {code:java}
> pipeline_options = PipelineOptions()
> options = pipeline_options.view_as(PreProcessOptions)
> options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=options)
> apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
>  .format(bq_project=options.bq_project, customer=options.customer)
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
> apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)){code}
> the code just works fine.
>  
> Is it a limitation of the apache beam to read from the same source?
> If so, can we add this feature? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)