You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/03/08 23:24:24 UTC

[beam] branch master updated: Pass str rather than TableReference

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d3a354  Pass str rather than TableReference
     new eea867d  Merge pull request #14112 from [BEAM-11884] Pass destination as str rather than TableReference within BigQueryBatchFileLoads
9d3a354 is described below

commit 9d3a354f48032e237bdc67b49761e3ee612fd5d1
Author: Chuck Yang <ch...@getcruise.com>
AuthorDate: Sat Feb 27 00:04:58 2021 -0800

    Pass str rather than TableReference
---
 sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 0449880..298ead6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -32,7 +32,6 @@ from __future__ import absolute_import
 
 import hashlib
 import logging
-import pickle
 import random
 import time
 import uuid
@@ -256,7 +255,7 @@ class WriteRecordsToFile(beam.DoFn):
       self._destination_to_file_writer.pop(destination)
       yield pvalue.TaggedOutput(
           WriteRecordsToFile.WRITTEN_FILE_TAG,
-          (element[0], (file_path, file_size)))
+          (destination, (file_path, file_size)))
 
   def finish_bundle(self):
     for destination, file_path_writer in \
@@ -287,7 +286,7 @@ class WriteGroupedRecordsToFile(beam.DoFn):
     self.file_format = file_format or bigquery_tools.FileFormat.JSON
 
   def process(self, element, file_prefix, *schema_side_inputs):
-    destination = element[0]
+    destination = bigquery_tools.get_hashable_destination(element[0])
     rows = element[1]
 
     file_path, writer = None, None
@@ -509,7 +508,9 @@ class TriggerLoadJobs(beam.DoFn):
       create_disposition = 'CREATE_IF_NEEDED'
       # For temporary tables, we create a new table with the name with JobId.
       table_reference.tableId = job_name
-      yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)
+      yield pvalue.TaggedOutput(
+          TriggerLoadJobs.TEMP_TABLES,
+          bigquery_tools.get_hashable_destination(table_reference))
 
     _LOGGER.info(
         'Triggering job %s to load data to BigQuery table %s.'
@@ -904,13 +905,9 @@ class BigQueryBatchFileLoads(beam.PTransform):
             lambda x,
             deleting_tables: deleting_tables,
             pvalue.AsIter(temp_tables_pc))
-        # TableReference has no deterministic coder, but as this de-duplication
-        # is best-effort, pickling should be good enough.
-        | "RemoveTempTables/AddUselessValue" >>
-        beam.Map(lambda x: (pickle.dumps(x), None))
+        | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
         | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
-        | "RemoveTempTables/GetTableNames" >>
-        beam.MapTuple(lambda k, nones: pickle.loads(k))
+        | "RemoveTempTables/GetTableNames" >> beam.Keys()
         | "RemoveTempTables/Delete" >> beam.ParDo(
             DeleteTablesFn(self.test_client)))