You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/03/21 17:23:05 UTC

[beam] branch revert-17100-cyang/rfbq-interactive-fix created (now 3ccd12e)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch revert-17100-cyang/rfbq-interactive-fix
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 3ccd12e  Revert "[BEAM-14112] Avoid storing a generator in _CustomBigQuerySource (#17100)"

This branch includes the following new commits:

     new 3ccd12e  Revert "[BEAM-14112] Avoid storing a generator in _CustomBigQuerySource (#17100)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[beam] 01/01: Revert "[BEAM-14112] Avoid storing a generator in _CustomBigQuerySource (#17100)"

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-17100-cyang/rfbq-interactive-fix
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3ccd12e69bda2816929a54cf3639738c82733782
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Mon Mar 21 10:20:13 2022 -0700

    Revert "[BEAM-14112] Avoid storing a generator in _CustomBigQuerySource (#17100)"
    
    This reverts commit 62a661071b7db15e71d236abe68e15582e8997c9.
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 38 +++++++++++++---------
 .../apache_beam/io/gcp/bigquery_read_it_test.py    | 12 -------
 2 files changed, 22 insertions(+), 28 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index fc0f4b7..0503de8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -705,7 +705,7 @@ class _CustomBigQuerySource(BoundedSource):
     self.flatten_results = flatten_results
     self.coder = coder or _JsonToDictCoder
     self.kms_key = kms_key
-    self.export_result = None
+    self.split_result = None
     self.options = pipeline_options
     self.bq_io_metadata = None  # Populate in setup, as it may make an RPC
     self.bigquery_job_labels = bigquery_job_labels or {}
@@ -789,26 +789,19 @@ class _CustomBigQuerySource(BoundedSource):
       project = self.project
     return project
 
-  def _create_source(self, path, bq):
+  def _create_source(self, path, schema):
     if not self.use_json_exports:
       return create_avro_source(path)
     else:
-      if isinstance(self.table_reference, vp.ValueProvider):
-        table_ref = bigquery_tools.parse_table_reference(
-            self.table_reference.get(), project=self.project)
-      else:
-        table_ref = self.table_reference
-      table = bq.get_table(
-          table_ref.projectId, table_ref.datasetId, table_ref.tableId)
       return TextSource(
           path,
           min_bundle_size=0,
           compression_type=CompressionTypes.UNCOMPRESSED,
           strip_trailing_newlines=True,
-          coder=self.coder(table.schema))
+          coder=self.coder(schema))
 
   def split(self, desired_bundle_size, start_position=None, stop_position=None):
-    if self.export_result is None:
+    if self.split_result is None:
       bq = bigquery_tools.BigQueryWrapper(
           temp_dataset_id=(
               self.temp_dataset.datasetId if self.temp_dataset else None))
@@ -820,13 +813,16 @@ class _CustomBigQuerySource(BoundedSource):
       if not self.table_reference.projectId:
         self.table_reference.projectId = self._get_project()
 
-      self.export_result = self._export_files(bq)
+      schema, metadata_list = self._export_files(bq)
+      # Sources to be created lazily within a generator as they're output.
+      self.split_result = (
+          self._create_source(metadata.path, schema)
+          for metadata in metadata_list)
 
       if self.query is not None:
         bq.clean_up_temporary_dataset(self._get_project())
 
-    for metadata in self.export_result:
-      source = self._create_source(metadata.path, bq)
+    for source in self.split_result:
       yield SourceBundle(
           weight=1.0, source=source, start_position=None, stop_position=None)
 
@@ -878,7 +874,7 @@ class _CustomBigQuerySource(BoundedSource):
     """Runs a BigQuery export job.
 
     Returns:
-      a list of FileMetadata instances
+      bigquery.TableSchema instance, a list of FileMetadata instances
     """
     job_labels = self._get_bq_metadata().add_additional_bq_job_labels(
         self.bigquery_job_labels)
@@ -908,7 +904,17 @@ class _CustomBigQuerySource(BoundedSource):
                                        job_labels=job_labels,
                                        use_avro_logical_types=True)
     bq.wait_for_bq_job(job_ref)
-    return FileSystems.match([gcs_location])[0].metadata_list
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    if isinstance(self.table_reference, vp.ValueProvider):
+      table_ref = bigquery_tools.parse_table_reference(
+          self.table_reference.get(), project=self.project)
+    else:
+      table_ref = self.table_reference
+    table = bq.get_table(
+        table_ref.projectId, table_ref.datasetId, table_ref.tableId)
+
+    return table.schema, metadata_list
 
 
 class _CustomBigQueryStorageSource(BoundedSource):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index e47754d..9101039 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -37,8 +37,6 @@ from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.value_provider import StaticValueProvider
-from apache_beam.runners.interactive import interactive_beam
-from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -675,16 +673,6 @@ class ReadAllBQTests(BigQueryReadIntegrationTests):
           equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3))
 
 
-class ReadInteractiveRunnerTests(BigQueryReadIntegrationTests):
-  @skip(['PortableRunner', 'FlinkRunner'])
-  @pytest.mark.it_postcommit
-  def test_read_in_interactive_runner(self):
-    p = beam.Pipeline(InteractiveRunner(), argv=self.args)
-    pcoll = p | beam.io.ReadFromBigQuery(query="SELECT 1")
-    result = interactive_beam.collect(pcoll)
-    assert result.iloc[0, 0] == 1
-
-
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()