You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/27 16:02:37 UTC

[beam] branch master updated: Merge pull request #8093 from pabloem/sch-dest-bq

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d08e95  Merge pull request #8093 from pabloem/sch-dest-bq
7d08e95 is described below

commit 7d08e950b7a5aa10ba3eb50de7a3d95225d84501
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Wed Mar 27 09:02:27 2019 -0700

    Merge pull request #8093 from pabloem/sch-dest-bq
    
    [BEAM-6892] Schemas and destinations are provided to WriteToBigQuery separately (#8093)
---
 .../examples/cookbook/bigquery_tornadoes.py        |  8 +--
 .../cookbook/bigquery_tornadoes_it_test.py         |  4 +-
 .../io/gcp/big_query_query_to_table_it_test.py     |  7 +-
 .../io/gcp/big_query_query_to_table_pipeline.py    | 12 +---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 79 ++++++++++++++--------
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 78 ++++++++++++++++-----
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 10 +--
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 23 +++++--
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 10 +--
 9 files changed, 138 insertions(+), 93 deletions(-)

diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 9be3f89..c7c837b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -87,19 +87,13 @@ def run(argv=None):
     rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
     counts = count_tornadoes(rows)
 
-    if 'temp_location' in p.options.get_all_options():
-      location = p.options.get_all_options()['temp_location']
-    else:
-      location = known_args.gcs_location
-
     # Write the output using a "Write" transform that has side effects.
     # pylint: disable=expression-not-assigned
     counts | 'Write' >> beam.io.WriteToBigQuery(
         known_args.output,
         schema='month:INTEGER, tornado_count:INTEGER',
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
-        gs_location=location)
+        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
 
     # Run the pipeline (all operations are deferred until run() is called).
 
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index bf3581c..f7eb93b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -60,10 +60,8 @@ class BigqueryTornadoesIT(unittest.TestCase):
                               project=project,
                               query=query,
                               checksum=self.DEFAULT_CHECKSUM)]
-    gs_location = 'gs://temp-storage-for-upload-tests/%s' % table
     extra_opts = {'output': output_table,
-                  'on_success_matcher': all_of(*pipeline_verifiers),
-                  'gcs_location': gs_location}
+                  'on_success_matcher': all_of(*pipeline_verifiers)}
 
     # Register cleanup before pipeline execution.
     # Note that actual execution happens in reverse order.
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 2570fc7..5bdf8a8 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -131,10 +131,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         query=verify_query,
         checksum=expected_checksum)]
 
-    gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
     extra_opts = {'query': LEGACY_QUERY,
                   'output': self.output_table,
-                  'bq_temp_location': gs_location,
                   'output_schema': DIALECT_OUTPUT_SCHEMA,
                   'use_standard_sql': False,
                   'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -149,10 +147,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         project=self.project,
         query=verify_query,
         checksum=expected_checksum)]
-    gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
+
     extra_opts = {'query': STANDARD_QUERY,
                   'output': self.output_table,
-                  'bq_temp_location': gs_location,
                   'output_schema': DIALECT_OUTPUT_SCHEMA,
                   'use_standard_sql': True,
                   'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -197,11 +194,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         query=verify_query,
         checksum=expected_checksum)]
     self._setup_new_types_env()
-    gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
     extra_opts = {
         'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
         'output': self.output_table,
-        'bq_temp_location': gs_location,
         'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
         'use_standard_sql': False,
         'on_success_matcher': all_of(*pipeline_verifiers)}
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 07b194e..45aa10e 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -50,10 +50,7 @@ def run_bq_pipeline(argv=None):
                       help='Output BQ table to write results to.')
   parser.add_argument('--kms_key', default=None,
                       help='Use this Cloud KMS key with BigQuery.')
-  parser.add_argument('--bq_temp_location',
-                      default=None,
-                      help=('GCS bucket to use to store files for '
-                            'loading data into BigQuery.'))
+
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   table_schema = parse_table_schema_from_json(known_args.output_schema)
@@ -61,10 +58,6 @@ def run_bq_pipeline(argv=None):
 
   p = TestPipeline(options=PipelineOptions(pipeline_args))
 
-  if 'temp_location' in p.options.get_all_options():
-    location = p.options.get_all_options()['temp_location']
-  else:
-    location = known_args.bq_temp_location
   # pylint: disable=expression-not-assigned
   # pylint: disable=bad-continuation
   (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
@@ -74,8 +67,7 @@ def run_bq_pipeline(argv=None):
            known_args.output,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-           gs_location=location))
+           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
 
   result = p.run()
   result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 27ee67a..f94f31b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -552,6 +552,7 @@ class BigQueryWriteFn(DoFn):
   def __init__(
       self,
       batch_size,
+      schema=None,
       create_disposition=None,
       write_disposition=None,
       kms_key=None,
@@ -590,11 +591,13 @@ class BigQueryWriteFn(DoFn):
       retry_strategy: The strategy to use when retrying streaming inserts
         into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
     """
+    self.schema = schema
     self.test_client = test_client
     self.create_disposition = create_disposition
     self.write_disposition = write_disposition
     self._rows_buffer = []
     self._reset_rows_buffer()
+    self._observed_tables = set()
 
     self._total_buffered_rows = 0
     self.kms_key = kms_key
@@ -654,6 +657,11 @@ class BigQueryWriteFn(DoFn):
     if str_table_reference in self._observed_tables:
       return
 
+    if self.create_disposition == BigQueryDisposition.CREATE_NEVER:
+      # If we never want to create the table, we assume it already exists,
+      # and avoid the get-or-create step.
+      return
+
     logging.debug('Creating or getting table %s with schema %s.',
                   table_reference, schema)
 
@@ -671,12 +679,17 @@ class BigQueryWriteFn(DoFn):
 
   def process(self, element, unused_create_fn_output=None):
     destination = element[0]
-    if isinstance(destination, tuple):
-      schema = destination[1]
-      destination = destination[0]
-      self._create_table_if_needed(
-          bigquery_tools.parse_table_reference(destination),
-          schema)
+
+    if callable(self.schema):
+      schema = self.schema(destination)
+    elif isinstance(self.schema, vp.ValueProvider):
+      schema = self.schema.get()
+    else:
+      schema = self.schema
+
+    self._create_table_if_needed(
+        bigquery_tools.parse_table_reference(destination),
+        schema)
 
     row = element[1]
     self._rows_buffer[destination].append(row)
@@ -762,9 +775,10 @@ class WriteToBigQuery(PTransform):
                max_file_size=None,
                max_files_per_bundle=None,
                test_client=None,
-               gs_location=None,
+               custom_gcs_temp_location=None,
                method=None,
-               insert_retry_strategy=None):
+               insert_retry_strategy=None,
+               validate=True):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -784,17 +798,19 @@ class WriteToBigQuery(PTransform):
       project (str): The ID of the project containing this table or
         :data:`None` if the table reference is specified entirely by the table
         argument.
-      schema (str,dict,ValueProvider): The schema to be used if the
+      schema (str,dict,ValueProvider,callable): The schema to be used if the
         BigQuery table to write has to be created. This can be either specified
         as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
 bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
-        or a python dictionary, or the string or dictionary itself.
+        or a python dictionary, or the string or dictionary itself,
         object or a single string  of the form
         ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
         separated list of fields. Here ``'type'`` should specify the BigQuery
         type of the field. Single string based schemas do not support nested
         fields, repeated fields, or specifying a BigQuery mode for fields
         (mode will always be set to ``'NULLABLE'``).
+        If a callable, then it should receive a destination (in the form of
+        a TableReference or a string, and return a str, dict or TableSchema.
       create_disposition (BigQueryDisposition): A string describing what
         happens if the table does not exist. Possible values are:
 
@@ -825,8 +841,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
         written by a worker. The default here is 20. Larger values will allow
         writing to multiple destinations without having to reshard - but they
         increase the memory burden on the workers.
-      gs_location (str): A GCS location to store files to be used for file
-        loads into BigQuery. By default, this will use the pipeline's
+      custom_gcs_temp_location (str): A GCS location to store files to be used
+        for file loads into BigQuery. By default, this will use the pipeline's
         temp_location, but for pipelines whose temp_location is not appropriate
         for BQ File Loads, users should pass a specific one.
       method: The method to use to write to BigQuery. It may be
@@ -836,6 +852,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
         FILE_LOADS on Batch pipelines.
       insert_retry_strategy: The strategy to use when retrying streaming inserts
         into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
+      validate: Indicates whether to perform validation checks on
+        inputs. This parameter is primarily used for testing.
     """
     self.table_reference = bigquery_tools.parse_table_reference(
         table, dataset, project)
@@ -847,11 +865,14 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
     self.batch_size = batch_size
     self.kms_key = kms_key
     self.test_client = test_client
-    self.gs_location = gs_location
+
+    # TODO(pabloem): Consider handling ValueProvider for this location.
+    self.custom_gcs_temp_location = custom_gcs_temp_location
     self.max_file_size = max_file_size
     self.max_files_per_bundle = max_files_per_bundle
     self.method = method or WriteToBigQuery.Method.DEFAULT
     self.insert_retry_strategy = insert_retry_strategy
+    self._validate = validate
 
   @staticmethod
   def get_table_schema_from_string(schema):
@@ -919,17 +940,15 @@ bigquery_v2_messages.TableSchema):
       Dict[str, Any]: The schema to be used if the BigQuery table to write has
       to be created but in the dictionary format.
     """
-    if isinstance(schema, dict):
-      return schema
-    elif schema is None:
+    if (isinstance(schema, (dict, vp.ValueProvider)) or
+        callable(schema) or
+        schema is None):
       return schema
     elif isinstance(schema, (str, unicode)):
       table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
       return WriteToBigQuery.table_schema_to_dict(table_schema)
     elif isinstance(schema, bigquery.TableSchema):
       return WriteToBigQuery.table_schema_to_dict(schema)
-    elif isinstance(schema, vp.ValueProvider):
-      return schema
     else:
       raise TypeError('Unexpected schema argument: %s.' % schema)
 
@@ -948,6 +967,7 @@ bigquery_v2_messages.TableSchema):
         self.method == WriteToBigQuery.Method.STREAMING_INSERTS):
       # TODO: Support load jobs for streaming pipelines.
       bigquery_write_fn = BigQueryWriteFn(
+          schema=self.schema,
           batch_size=self.batch_size,
           create_disposition=self.create_disposition,
           write_disposition=self.write_disposition,
@@ -957,8 +977,7 @@ bigquery_v2_messages.TableSchema):
 
       outputs = (pcoll
                  | 'AppendDestination' >> beam.ParDo(
-                     bigquery_tools.AppendDestinationsFn(
-                         destination=self.table_reference, schema=self.schema))
+                     bigquery_tools.AppendDestinationsFn(self.table_reference))
                  | 'StreamInsertRows' >> ParDo(bigquery_write_fn).with_outputs(
                      BigQueryWriteFn.FAILED_ROWS, main='main'))
 
@@ -969,15 +988,17 @@ bigquery_v2_messages.TableSchema):
             'File Loads to BigQuery are only supported on Batch pipelines.')
 
       from apache_beam.io.gcp import bigquery_file_loads
-      return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
-          destination=self.table_reference,
-          schema=self.schema,
-          create_disposition=self.create_disposition,
-          write_disposition=self.write_disposition,
-          max_file_size=self.max_file_size,
-          max_files_per_bundle=self.max_files_per_bundle,
-          gs_location=self.gs_location,
-          test_client=self.test_client)
+      return (pcoll
+              | bigquery_file_loads.BigQueryBatchFileLoads(
+                  destination=self.table_reference,
+                  schema=self.schema,
+                  create_disposition=self.create_disposition,
+                  write_disposition=self.write_disposition,
+                  max_file_size=self.max_file_size,
+                  max_files_per_bundle=self.max_files_per_bundle,
+                  custom_gcs_temp_location=self.custom_gcs_temp_location,
+                  test_client=self.test_client,
+                  validate=self._validate))
 
   def display_data(self):
     res = {}
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 7395370..f6d5c83 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -44,7 +44,7 @@ from apache_beam.io import filesystems as fs
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
 from apache_beam.options import value_provider as vp
-from apache_beam.transforms.combiners import Count
+from apache_beam.options.pipeline_options import GoogleCloudOptions
 
 ONE_TERABYTE = (1 << 40)
 
@@ -66,13 +66,27 @@ def _generate_load_job_name():
   return 'beam_load_%s_%s' % (datetime_component, random.randint(0, 100))
 
 
-def _generate_file_prefix(pipeline_gcs_location):
-  # If a gcs location is provided to the pipeline, then we shall use that.
-  # Otherwise, we shall use the temp_location from pipeline options.
-  gcs_base = str(pipeline_gcs_location or
-                 vp.RuntimeValueProvider.get_value('temp_location', str, ''))
-  prefix_uuid = _bq_uuid()
-  return fs.FileSystems.join(gcs_base, 'bq_load', prefix_uuid)
+def file_prefix_generator(with_validation=True):
+  def _generate_file_prefix(pipeline_gcs_location):
+    # If a gcs location is provided to the pipeline, then we shall use that.
+    # Otherwise, we shall use the temp_location from pipeline options.
+    gcs_base = str(pipeline_gcs_location or
+                   vp.RuntimeValueProvider.get_value('temp_location', str, ''))
+
+    # This will fail at pipeline execution time, but will fail early, as this
+    # step doesn't have any dependencies (and thus will be one of the first
+    # stages to be run).
+    if with_validation and (not gcs_base or not gcs_base.startswith('gs://')):
+      raise ValueError('Invalid GCS location.\n'
+                       'Writing to BigQuery with FILE_LOADS method requires a '
+                       'GCS location to be provided to write files to be loaded'
+                       ' loaded into BigQuery. Please provide a GCS bucket, or '
+                       'pass method="STREAMING_INSERTS" to WriteToBigQuery.')
+
+    prefix_uuid = _bq_uuid()
+    return fs.FileSystems.join(gcs_base, 'bq_load', prefix_uuid)
+
+  return _generate_file_prefix
 
 
 def _make_new_file_writer(file_prefix, destination):
@@ -276,8 +290,8 @@ class TriggerCopyJobs(beam.DoFn):
 
     copy_to_reference = bigquery_tools.parse_table_reference(destination)
     if copy_to_reference.projectId is None:
-      copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
-          'project', str, '')
+      copy_to_reference.projectId = vp.RuntimeValueProvider.get_value('project',
+                                                                      str, '')
 
     copy_from_reference = bigquery_tools.parse_table_reference(destination)
     copy_from_reference.tableId = job_reference.jobId
@@ -350,6 +364,13 @@ class TriggerLoadJobs(beam.DoFn):
     destination = element[0]
     files = iter(element[1])
 
+    if callable(self.schema):
+      schema = self.schema(destination)
+    elif isinstance(self.schema, vp.ValueProvider):
+      schema = self.schema.get()
+    else:
+      schema = self.schema
+
     job_count = 0
     batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS))
     while batch_of_files:
@@ -380,7 +401,7 @@ class TriggerLoadJobs(beam.DoFn):
                    job_name, table_reference)
       job_reference = self.bq_wrapper.perform_load_job(
           table_reference, batch_of_files, job_name,
-          schema=self.schema,
+          schema=schema,
           write_disposition=self.write_disposition,
           create_disposition=self.create_disposition)
       yield (destination, job_reference)
@@ -466,20 +487,21 @@ class BigQueryBatchFileLoads(beam.PTransform):
       self,
       destination,
       schema=None,
-      gs_location=None,
+      custom_gcs_temp_location=None,
       create_disposition=None,
       write_disposition=None,
       coder=None,
       max_file_size=None,
       max_files_per_bundle=None,
-      test_client=None):
+      test_client=None,
+      validate=True):
     self.destination = destination
     self.create_disposition = create_disposition
     self.write_disposition = write_disposition
     self.max_file_size = max_file_size or _DEFAULT_MAX_FILE_SIZE
     self.max_files_per_bundle = (max_files_per_bundle or
                                  _DEFAULT_MAX_WRITERS_PER_BUNDLE)
-    self._input_gs_location = gs_location
+    self._custom_gcs_temp_location = custom_gcs_temp_location
     self.test_client = test_client
     self.schema = schema
     self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
@@ -490,9 +512,28 @@ class BigQueryBatchFileLoads(beam.PTransform):
     # job to run - and thus we avoid using temporary tables
     self.temp_tables = True if callable(destination) else False
 
+    self._validate = validate
+    if self._validate:
+      self.verify()
+
+  def verify(self):
+    if (isinstance(self._custom_gcs_temp_location, str) and
+        not self._custom_gcs_temp_location.startswith('gs://')):
+      # Only fail if the custom location is provided, and it is not a GCS
+      # location.
+      raise ValueError('Invalid GCS location.\n'
+                       'Writing to BigQuery with FILE_LOADS method requires a '
+                       'GCS location to be provided to write files to be '
+                       'loaded into BigQuery. Please provide a GCS bucket, or '
+                       'pass method="STREAMING_INSERTS" to WriteToBigQuery.')
+
   def expand(self, pcoll):
     p = pcoll.pipeline
 
+    self._custom_gcs_temp_location = (
+        self._custom_gcs_temp_location
+        or p.options.view_as(GoogleCloudOptions).temp_location)
+
     load_job_name_pcv = pvalue.AsSingleton(
         p
         | "ImpulseJobName" >> beam.Create([None])
@@ -500,8 +541,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
 
     file_prefix_pcv = pvalue.AsSingleton(
         p
-        | "CreateFilePrefixView" >> beam.Create([self._input_gs_location])
-        | "GenerateFilePrefix" >> beam.Map(_generate_file_prefix))
+        | "CreateFilePrefixView" >> beam.Create(
+            [self._custom_gcs_temp_location])
+        | "GenerateFilePrefix" >> beam.Map(
+            file_prefix_generator(self._validate)))
 
     outputs = (
         pcoll
@@ -583,7 +626,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
          | "RemoveTempTables/PassTables" >> beam.FlatMap(
              lambda x, deleting_tables: deleting_tables,
              pvalue.AsIter(temp_tables_pc))
-         | "RemoveTempTables/DeduplicateTables" >> Count.PerElement()
+         | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
+         | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
          | "RemoveTempTables/GetTableNames" >> beam.Map(lambda elm: elm[0])
          | "RemoveTempTables/Delete" >> beam.ParDo(DeleteTablesFn()))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 499f0c3..9391b66 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -23,7 +23,6 @@ import json
 import logging
 import os
 import random
-import sys
 import time
 import unittest
 
@@ -295,8 +294,9 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
 
     transform = bigquery.WriteToBigQuery(
         destination,
-        gs_location=self._new_tempdir(),
-        test_client=bq_client)
+        custom_gcs_temp_location=self._new_tempdir(),
+        test_client=bq_client,
+        validate=False)
 
     # Need to test this with the DirectRunner to avoid serializing mocks
     with TestPipeline('DirectRunner') as p:
@@ -361,10 +361,6 @@ class BigQueryFileLoadsIT(unittest.TestCase):
     logging.info("Created dataset %s in project %s",
                  self.dataset_id, self.project)
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3'
-                   'TODO: BEAM-6711')
   @attr('IT')
   def test_multiple_destinations_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 67b2bef..5beb983 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -425,7 +425,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase):
             projectId='project_id', datasetId='dataset_id', tableId='table_id'))
     client.tabledata.InsertAll.return_value = \
       bigquery.TableDataInsertAllResponse(insertErrors=[])
-    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
@@ -439,7 +439,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase):
 
     # Destination is a tuple of (destination, schema) to ensure the table is
     # created.
-    fn.process((('project_id:dataset_id.table_id', None), {'month': 1}))
+    fn.process(('project_id:dataset_id.table_id', {'month': 1}))
 
     self.assertTrue(client.tables.Get.called)
     # InsertRows not called as batch size is not hit
@@ -522,18 +522,24 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
 
       _ = (input
            | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
-               table=value_provider.StaticValueProvider(str, output_table_1),
+               table=value_provider.StaticValueProvider(
+                   str, '%s:%s' % (self.project, output_table_1)),
                schema=value_provider.StaticValueProvider(dict, schema),
                method='STREAMING_INSERTS'))
       _ = (input
            | "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
-               table=value_provider.StaticValueProvider(str, output_table_2),
+               table=value_provider.StaticValueProvider(
+                   str, '%s:%s' % (self.project, output_table_2)),
                method='FILE_LOADS'))
 
   @attr('IT')
   def test_multiple_destinations_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
     output_table_2 = '%s%s' % (self.output_table, 2)
+
+    full_output_table_1 = '%s:%s' % (self.project, output_table_1)
+    full_output_table_2 = '%s:%s' % (self.project, output_table_2)
+
     schema1 = {'fields': [
         {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
         {'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
@@ -569,13 +575,16 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
 
       r = (input
            | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
-               table=lambda x: ((output_table_1, schema1)
+               table=lambda x: (full_output_table_1
                                 if 'language' in x
-                                else (output_table_2, schema2)),
+                                else full_output_table_2),
+               schema=lambda dest: (schema1
+                                    if dest == full_output_table_1
+                                    else schema2),
                method='STREAMING_INSERTS'))
 
       assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
-                  equal_to([(output_table_1, bad_record)]))
+                  equal_to([(full_output_table_1, bad_record)]))
 
   def tearDown(self):
     request = bigquery.BigqueryDatasetsDeleteRequest(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 82ea8ea..ca7216a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -997,8 +997,8 @@ class AppendDestinationsFn(DoFn):
   Experimental; no backwards compatibility guarantees.
   """
 
-  def __init__(self, destination, schema=None):
-    self.destination = AppendDestinationsFn._get_table_fn(destination, schema)
+  def __init__(self, destination):
+    self.destination = AppendDestinationsFn._get_table_fn(destination)
 
   @staticmethod
   def _value_provider_or_static_val(elm):
@@ -1010,13 +1010,9 @@ class AppendDestinationsFn(DoFn):
       return value_provider.StaticValueProvider(lambda x: x, value=elm)
 
   @staticmethod
-  def _get_table_fn(destination, schema=None):
+  def _get_table_fn(destination):
     if callable(destination):
       return destination
-    elif not callable(destination) and schema is not None:
-      return lambda x: (
-          AppendDestinationsFn._value_provider_or_static_val(destination).get(),
-          AppendDestinationsFn._value_provider_or_static_val(schema).get())
     else:
       return lambda x: AppendDestinationsFn._value_provider_or_static_val(
           destination).get()