You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/30 19:41:10 UTC

[GitHub] [beam] Ardagan opened a new pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Ardagan opened a new pull request #11582:
URL: https://github.com/apache/beam/pull/11582


   [BEAM-9650](https://issues.apache.org/jira/browse/BEAM-9650)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r420456908



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:

Review comment:
       I worry that this is a little clunky - but I appreciate that it provides validation, and even type checking if necessary. Perhaps give it a shorter name so it's 'easy' to create. 
   
   cc: @robertwb thoughts on a Pythonic PCollection element that describes a BigQuery Read action (read from table or read from query)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-649743827


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-649697558


   Run PythonFormatter PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-622510185


   > This is looking good. Something I think we should do is change the underlying implementation of ReadFromBigQuery
   
   This would be a bigger change than what I'd like to put in this PR. Can we do it in separate PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-694465408


   Is this stale? Should we keep it open?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r431365700



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()

Review comment:
       thanks. I see this has been added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r431421613



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,316 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None,
+      bq_client=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+    self.bq_client = bq_client
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper(self.bq_client)
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:
+      gcs_base = self.gcs_location.get()
+    elif temp_location is not None:
+      gcs_base = temp_location
+      logging.debug("gcs_location is empty, using temp_location instead")
+    else:
+      raise ValueError(
+          '{} requires a GCS location to be provided'.format(
+              self.__class__.__name__))
+    if self.validate:
+      self._validate_gcs_location(gcs_base)
+
+    job_id = uuid.uuid4().hex
+    return FileSystems.join(gcs_base, job_id, file_pattern)
+
+  @staticmethod
+  def _validate_gcs_location(gcs_location):
+    if not gcs_location.startswith('gs://'):
+      raise ValueError('Invalid GCS location: {}'.format(gcs_location))
+
+  def expand(self, pcoll):
+    class RemoveJsonFiles(beam.DoFn):
+      def process(self, unused_element, gcs_locations, cleanup_signal):
+        for location in gcs_locations:
+          match_result = FileSystems.match([location])[0].metadata_list
+          logging.debug(
+              "%s: matched %s files",
+              self.__class__.__name__,
+              len(match_result))
+          paths = [x.path for x in match_result]
+          FileSystems.delete(paths)
+
+    schema = bigquery_tools.get_dict_table_schema(self.schema)
+    schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
+
+    temp_location = pcoll.pipeline.options.view_as(
+        GoogleCloudOptions).temp_location
+    dump_location_pattern = self._get_destination_uri(temp_location)
+
+    files_to_read, cleanup_locations = (
+        pcoll
+        | beam.ParDo(
+      _ExtractBQData(dump_location_pattern, self.project, self.coder,
+                     self.schema, self.kms_key)).with_outputs(

Review comment:
       (meaning that it can be passed by users using ReadAllFromBQ)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r431365155



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:

Review comment:
       You're doing `gcs_location.get` because gcs location is a value provider - which we use for templates.
   
   Templates allow you to create a template file which represents an expanded pipeline, and submit it to dataflow along with runtime parameters, that get passed to RuntimeValueProviders.
   
   RuntimeValueProviders are filled in only at pipeline runtime(i.e. in `process`, `start_bundle`, `finish_bundle`), not at pipeline construction (i.e. in `expand`).
   
   This code will work for parameters passed at construction time, but it will not support templates/runtimevalueproviders. If you want to support RuntimeValueProviders, you should only call `get` on them in runtime functions (`process`, `finish_bundle`, `start_bundle`, others).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r419808099



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -283,6 +284,8 @@ def compute_table_name(row):
     'BigQuerySink',
     'WriteToBigQuery',
     'ReadFromBigQuery',
+    'ReadAllFromBigQueryRequest',

Review comment:
       Does the java api use a similar concept?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-650218921


   Run PythonFormatter PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r419183634



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on

Review comment:
       Would be great to put more info on this, for example how this transform can be used with a Fixed WIndow in a streaming pipeline to allow for slow - update - cache as well as what the snapshots are.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r422903477



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:

Review comment:
       We need schema to parse files generated from BQ export. I also use it to verify all queries return same data format.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-644919326


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-658338739


   Run Python 2 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-645607827


   Run Python 3.7 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r420386613



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:

Review comment:
       why do you need a target_schema?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()

Review comment:
       It would be great if the BQ wrapper could be passed a client as an argument, so that a mocked-out BQ client could be used.
   
   See https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L993 and https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1047-L1048

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:

Review comment:
       Note that this function runs at con struction time. This is before RuntimeValueProviders receive their value (this will work fine for string-type arguments, but won't work for runtime valueproviders).

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be

Review comment:
       I don't think you need a schema?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'

Review comment:
       FWIW it may be good to make this a constant somewhere

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)

Review comment:
       I believe this location looks something like `gs://bucket/dir/hexstring1/hexstring2*.json`. right? Is that the appropriate URI format [expected by BQ](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-651247098


   Run PythonFormatter PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r431366057



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,316 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None,
+      bq_client=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+    self.bq_client = bq_client
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper(self.bq_client)
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:
+      gcs_base = self.gcs_location.get()
+    elif temp_location is not None:
+      gcs_base = temp_location
+      logging.debug("gcs_location is empty, using temp_location instead")
+    else:
+      raise ValueError(
+          '{} requires a GCS location to be provided'.format(
+              self.__class__.__name__))
+    if self.validate:
+      self._validate_gcs_location(gcs_base)
+
+    job_id = uuid.uuid4().hex
+    return FileSystems.join(gcs_base, job_id, file_pattern)
+
+  @staticmethod
+  def _validate_gcs_location(gcs_location):
+    if not gcs_location.startswith('gs://'):
+      raise ValueError('Invalid GCS location: {}'.format(gcs_location))
+
+  def expand(self, pcoll):
+    class RemoveJsonFiles(beam.DoFn):
+      def process(self, unused_element, gcs_locations, cleanup_signal):
+        for location in gcs_locations:
+          match_result = FileSystems.match([location])[0].metadata_list
+          logging.debug(
+              "%s: matched %s files",
+              self.__class__.__name__,
+              len(match_result))
+          paths = [x.path for x in match_result]
+          FileSystems.delete(paths)
+
+    schema = bigquery_tools.get_dict_table_schema(self.schema)
+    schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
+
+    temp_location = pcoll.pipeline.options.view_as(
+        GoogleCloudOptions).temp_location
+    dump_location_pattern = self._get_destination_uri(temp_location)
+
+    files_to_read, cleanup_locations = (
+        pcoll
+        | beam.ParDo(
+      _ExtractBQData(dump_location_pattern, self.project, self.coder,
+                     self.schema, self.kms_key)).with_outputs(

Review comment:
       can you pass the mock BQ client here as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r421825128



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:

Review comment:
       checked w Robert, and he thought the class was fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan closed pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan closed pull request #11582:
URL: https://github.com/apache/beam/pull/11582


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-691533545






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r431365429



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'

Review comment:
       sounds good

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)

Review comment:
       makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-623216435


   > @rezarokni FYI
   
   V v nice :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r422903621



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be

Review comment:
       See response above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-691533545






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r431363173



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:

Review comment:
       In ReadFromBigQuery, the schema is fetched by querying BigQuery - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L700
   
   ReadFromBigQuery does not expect a user-given schema. I think we should keep the schema internal, and not user-provided. This also allows the same transform to receive different tables / queries with different schemas in the samne pipeline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan removed a comment on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan removed a comment on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-649697558


   Run PythonFormatter PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-651246985


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-694974066


   I moved this to another branch. let me close this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-645032400


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-691533545


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-651401099


   Run Python 3.7 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-624210475


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r420286996



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -283,6 +284,8 @@ def compute_table_name(row):
     'BigQuerySink',
     'WriteToBigQuery',
     'ReadFromBigQuery',
+    'ReadAllFromBigQueryRequest',

Review comment:
       I'm working on adding similar ReadAll to Java API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-644418471


   fyi Python precommit for 2.7 has been fixed only now. So the failure seems unrelated


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r443832544



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1529,3 +1530,180 @@ def display_data(self):
         'source': DisplayDataItem(self.source.__class__, label='Read Source'),
         'source_dd': self.source
     }
+
+
+class SDFBoundedSourceReader(PTransform):
+  """A ``PTransform`` that uses SDF to read from each ``BoundedSource`` in a
+  PCollection.
+
+  NOTE: This transform can only be used with beam_fn_api enabled.
+  """
+  class _SDFBoundedSourceRestriction(object):

Review comment:
       This significantly duplicates functionality from `_SDFBoundedSourceWrapper` above - do you think you could have a single implementation instead of duplicating a lot of this functionality?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r422904343



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:

Review comment:
       Thank you for checking on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-622514637


   @rezarokni FYI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11582:
URL: https://github.com/apache/beam/pull/11582#issuecomment-622092973


   This is looking good. Something I think we should do is change the underlying implementation of ReadFromBigQuery 
   
   ```
   class ReadFromBQ:
     def expand(self):
       return Create([self.input_element]) | ReadAllFromBigQuery()
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Ardagan commented on a change in pull request #11582: [BEAM-9650] Add ReadAllFromBigQuery PTransform

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r430612821



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'

Review comment:
       This is the only place where this string is used so far. Not sure it is worth moving it to a separate location as of now.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)

Review comment:
       I'm using same approach as in [ReadFromBigQuery](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1624). It may be worth keeping similar approach now and refactor both places in separate PR.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:

Review comment:
       Can you elaborate on this one please?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org