You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/06 01:00:32 UTC

[beam] branch master updated: Use Avro format for file loads to BigQuery

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc5771  Use Avro format for file loads to BigQuery
     new 9a3ba93  Merge pull request #10979 from [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK
0cc5771 is described below

commit 0cc577134b9e98b32020efab2ca07ccd1665f7cb
Author: Chuck Yang <ch...@getcruise.com>
AuthorDate: Mon Feb 24 16:17:31 2020 -0800

    Use Avro format for file loads to BigQuery
    
    This commit adds the ability to `WriteToBigQuery` via Avro file loads
    and sets the Avro format as the default format for file loads
    into BigQuery.
    
    Changes include:
    * Add a `temp_file_format` option to `WriteToBigQuery` and
      `BigQueryBatchFileLoads` to select which file format to use for
      loading data.
    * Set the default `temp_file_format` to Avro.
    * Move implementation of `get_table_schema_from_string`,
      `table_schema_to_dict`, and `get_dict_table_schema` to
      the `bigquery_tools` module.
    * Add `bigquery_avro_tools` module with utilities for converting
      BigQuery `TableSchema` to Avro `RecordSchema` (this is a port of
      what's available in the Java SDK, with modified behavior for
      logical types).
    * Modify `WriteRecordsToFile` and `WriteGroupedRecordsToFile` to accept
      `schema` and `file format`, since in order to be read by BigQuery,
      the Avro files must have schema headers.
    * Parameterize relevant tests to check both JSON and Avro code paths.
    * Add integration test using Avro-based file loads.
    * Introduce `JsonRowWriter` and `AvroRowWriter` classes which implement
      the `io.IOBase` interface and are used by `WriteRecordsToFile`
      and `WriteGroupedRecordsToFile`.
---
 CHANGES.md                                         |   2 +-
 sdks/python/apache_beam/io/gcp/bigquery.py         | 125 ++++--------
 .../apache_beam/io/gcp/bigquery_avro_tools.py      | 132 +++++++++++++
 .../apache_beam/io/gcp/bigquery_avro_tools_test.py | 181 ++++++++++++++++++
 .../apache_beam/io/gcp/bigquery_file_loads.py      |  91 ++++++---
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 129 ++++++++-----
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 125 +++++++++++-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 211 ++++++++++++++++++++-
 .../apache_beam/io/gcp/bigquery_tools_test.py      |  67 +++++++
 .../apache_beam/io/gcp/bigquery_write_it_test.py   |   7 +-
 sdks/python/apache_beam/io/localfilesystem.py      |   3 +-
 sdks/python/scripts/generate_pydoc.sh              |   1 +
 sdks/python/scripts/run_integration_test.sh        |   1 +
 13 files changed, 902 insertions(+), 173 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 44fda85..8e37348 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,7 +38,7 @@
 * New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality. (Python) ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146))
 * New AnnotateImage & AnnotateImageWithContext PTransform's for element-wise & batch image annotation using Google Cloud Vision API. (Python) ([BEAM-9247](https://issues.apache.org/jira/browse/BEAM-9247))
 * Added a PTransform for inspection and deidentification of text using Google Cloud DLP. (Python) ([BEAM-9258](https://issues.apache.org/jira/browse/BEAM-9258))
-* New AnnotateText PTransform that integrates Google Cloud Natural Language functionality (Python) ([BEAM-9248](https://issues.apache.org/jira/browse/BEAM-9248))
+* Added ability to write to BigQuery via Avro file loads (Python)([BEAM-8841](https://issues.apache.org/jira/browse/BEAM-8841))
 
 ### Breaking Changes
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 74c2293..322311c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -720,7 +720,7 @@ class _CustomBigQuerySource(BoundedSource):
     job_ref = bq.perform_extract_job([self.gcs_location],
                                      job_id,
                                      self.table_reference,
-                                     bigquery_tools.ExportFileFormat.JSON,
+                                     bigquery_tools.FileFormat.JSON,
                                      include_header=False)
     bq.wait_for_bq_job(job_ref)
     metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
@@ -1239,7 +1239,8 @@ class WriteToBigQuery(PTransform):
       table_side_inputs=None,
       schema_side_inputs=None,
       triggering_frequency=None,
-      validate=True):
+      validate=True,
+      temp_file_format=None):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -1272,8 +1273,9 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
         (mode will always be set to ``'NULLABLE'``).
         If a callable, then it should receive a destination (in the form of
         a TableReference or a string, and return a str, dict or TableSchema.
-        One may also pass ``SCHEMA_AUTODETECT`` here, and BigQuery will try to
-        infer the schema for the files that are being loaded.
+        One may also pass ``SCHEMA_AUTODETECT`` here when using JSON-based
+        file loads, and BigQuery will try to infer the schema for the files
+        that are being loaded.
       create_disposition (BigQueryDisposition): A string describing what
         happens if the table does not exist. Possible values are:
 
@@ -1338,6 +1340,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
         about BigQuery quotas.
       validate: Indicates whether to perform validation checks on
         inputs. This parameter is primarily used for testing.
+      temp_file_format: The format to use for file loads into BigQuery. The
+        options are NEWLINE_DELIMITED_JSON or AVRO, with AVRO being used
+        by default. For advantages and limitations of the two formats, see
+        https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
+        and
+        https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.
     """
     self.table_reference = bigquery_tools.parse_table_reference(
         table, dataset, project)
@@ -1348,7 +1356,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
     if schema == SCHEMA_AUTODETECT:
       self.schema = schema
     else:
-      self.schema = WriteToBigQuery.get_dict_table_schema(schema)
+      self.schema = bigquery_tools.get_dict_table_schema(schema)
     self.batch_size = batch_size
     self.kms_key = kms_key
     self.test_client = test_client
@@ -1361,87 +1369,18 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
     self.triggering_frequency = triggering_frequency
     self.insert_retry_strategy = insert_retry_strategy
     self._validate = validate
+    self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.AVRO
 
     self.additional_bq_parameters = additional_bq_parameters or {}
     self.table_side_inputs = table_side_inputs or ()
     self.schema_side_inputs = schema_side_inputs or ()
 
-  @staticmethod
-  def get_table_schema_from_string(schema):
-    """Transform the string table schema into a
-    :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema` instance.
-
-    Args:
-      schema (str): The sting schema to be used if the BigQuery table to write
-        has to be created.
-
-    Returns:
-      ~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema:
-      The schema to be used if the BigQuery table to write has to be created
-      but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema` format.
-    """
-    table_schema = bigquery.TableSchema()
-    schema_list = [s.strip() for s in schema.split(',')]
-    for field_and_type in schema_list:
-      field_name, field_type = field_and_type.split(':')
-      field_schema = bigquery.TableFieldSchema()
-      field_schema.name = field_name
-      field_schema.type = field_type
-      field_schema.mode = 'NULLABLE'
-      table_schema.fields.append(field_schema)
-    return table_schema
-
-  @staticmethod
-  def table_schema_to_dict(table_schema):
-    """Create a dictionary representation of table schema for serialization
-    """
-    def get_table_field(field):
-      """Create a dictionary representation of a table field
-      """
-      result = {}
-      result['name'] = field.name
-      result['type'] = field.type
-      result['mode'] = getattr(field, 'mode', 'NULLABLE')
-      if hasattr(field, 'description') and field.description is not None:
-        result['description'] = field.description
-      if hasattr(field, 'fields') and field.fields:
-        result['fields'] = [get_table_field(f) for f in field.fields]
-      return result
-
-    if not isinstance(table_schema, bigquery.TableSchema):
-      raise ValueError("Table schema must be of the type bigquery.TableSchema")
-    schema = {'fields': []}
-    for field in table_schema.fields:
-      schema['fields'].append(get_table_field(field))
-    return schema
-
-  @staticmethod
-  def get_dict_table_schema(schema):
-    """Transform the table schema into a dictionary instance.
-
-    Args:
-      schema (~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema):
-        The schema to be used if the BigQuery table to write has to be created.
-        This can either be a dict or string or in the TableSchema format.
-
-    Returns:
-      Dict[str, Any]: The schema to be used if the BigQuery table to write has
-      to be created but in the dictionary format.
-    """
-    if (isinstance(schema, (dict, vp.ValueProvider)) or callable(schema) or
-        schema is None):
-      return schema
-    elif isinstance(schema, (str, unicode)):
-      table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
-      return WriteToBigQuery.table_schema_to_dict(table_schema)
-    elif isinstance(schema, bigquery.TableSchema):
-      return WriteToBigQuery.table_schema_to_dict(schema)
-    else:
-      raise TypeError('Unexpected schema argument: %s.' % schema)
+  # Dict/schema methods were moved to bigquery_tools, but keep references
+  # here for backward compatibility.
+  get_table_schema_from_string = \
+      staticmethod(bigquery_tools.get_table_schema_from_string)
+  table_schema_to_dict = staticmethod(bigquery_tools.table_schema_to_dict)
+  get_dict_table_schema = staticmethod(bigquery_tools.get_dict_table_schema)
 
   def _compute_method(self, experiments, is_streaming_pipeline):
     # If the new BQ sink is not activated for experiment flags, then we use
@@ -1469,13 +1408,12 @@ bigquery_v2_messages.TableSchema):
 
     method_to_use = self._compute_method(experiments, is_streaming_pipeline)
 
-    if (method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS and
-        self.schema == SCHEMA_AUTODETECT):
-      raise ValueError(
-          'Schema auto-detection is not supported for streaming '
-          'inserts into BigQuery. Only for File Loads.')
-
     if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
+      if self.schema == SCHEMA_AUTODETECT:
+        raise ValueError(
+            'Schema auto-detection is not supported for streaming '
+            'inserts into BigQuery. Only for File Loads.')
+
       if self.triggering_frequency:
         raise ValueError(
             'triggering_frequency can only be used with '
@@ -1496,13 +1434,26 @@ bigquery_v2_messages.TableSchema):
 
       return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
     else:
+      if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
+        if self.schema == SCHEMA_AUTODETECT:
+          raise ValueError(
+              'Schema auto-detection is not supported when using Avro based '
+              'file loads into BigQuery. Please specify a schema or set '
+              'temp_file_format="NEWLINE_DELIMITED_JSON"')
+        if self.schema is None:
+          raise ValueError(
+              'A schema must be provided when writing to BigQuery using '
+              'Avro based file loads')
+
       from apache_beam.io.gcp import bigquery_file_loads
+
       return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
           destination=self.table_reference,
           schema=self.schema,
           create_disposition=self.create_disposition,
           write_disposition=self.write_disposition,
           triggering_frequency=self.triggering_frequency,
+          temp_file_format=self._temp_file_format,
           max_file_size=self.max_file_size,
           max_files_per_bundle=self.max_files_per_bundle,
           custom_gcs_temp_location=self.custom_gcs_temp_location,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
new file mode 100644
index 0000000..aa11d10
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Avro files in the context of BigQuery.
+
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+BIG_QUERY_TO_AVRO_TYPES = {
+    "RECORD": "record",
+    "STRING": "string",
+    "BOOLEAN": "boolean",
+    "BYTES": "bytes",
+    "FLOAT": "double",
+    "INTEGER": "long",
+    "TIME": {
+        "type": "long",
+        "logicalType": "time-micros",
+    },
+    "TIMESTAMP": {
+        "type": "long",
+        "logicalType": "timestamp-micros",
+    },
+    "DATE": {
+        "type": "int",
+        "logicalType": "date",
+    },
+    "DATETIME": "string",
+    "NUMERIC": {
+        "type": "bytes",
+        "logicalType": "decimal",
+        # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type
+        "precision": 38,
+        "scale": 9,
+    },
+    "GEOGRAPHY": "string",
+}
+
+
+def get_record_schema_from_dict_table_schema(
+    schema_name, table_schema, namespace="apache_beam.io.gcp.bigquery"):
+  # type: (Text, Dict[Text, Any], Text) -> Dict[Text, Any]
+
+  """Convert a table schema into an Avro schema.
+
+  Args:
+    schema_name (Text): The name of the record.
+    table_schema (Dict[Text, Any]): A BigQuery table schema in dict form.
+    namespace (Text): The namespace of the Avro schema.
+
+  Returns:
+    Dict[Text, Any]: The schema as an Avro RecordSchema.
+  """
+  avro_fields = [
+      table_field_to_avro_field(field, ".".join((namespace, schema_name)))
+      for field in table_schema["fields"]
+  ]
+
+  return {
+      "type": "record",
+      "name": schema_name,
+      "fields": avro_fields,
+      "doc": "Translated Avro Schema for {}".format(schema_name),
+      "namespace": namespace,
+  }
+
+
+def table_field_to_avro_field(table_field, namespace):
+  # type: (Dict[Text, Any]) -> Dict[Text, Any]
+
+  """Convert a BigQuery field to an avro field.
+
+  Args:
+    table_field (Dict[Text, Any]): A BigQuery field in dict form.
+
+  Returns:
+    Dict[Text, Any]: An equivalent Avro field in dict form.
+  """
+  assert "type" in table_field, \
+    "Unable to get type for table field {}".format(table_field)
+  assert table_field["type"] in BIG_QUERY_TO_AVRO_TYPES, \
+    "Unable to map BigQuery field type {} to avro type".format(
+      table_field["type"])
+
+  avro_type = BIG_QUERY_TO_AVRO_TYPES[table_field["type"]]
+
+  if avro_type == "record":
+    element_type = get_record_schema_from_dict_table_schema(
+        table_field["name"],
+        table_field,
+        namespace=".".join((namespace, table_field["name"])))
+  else:
+    element_type = avro_type
+
+  field_mode = table_field.get("mode", "NULLABLE")
+
+  if field_mode in (None, "NULLABLE"):
+    field_type = ["null", element_type]
+  elif field_mode == "REQUIRED":
+    field_type = element_type
+  elif field_mode == "REPEATED":
+    field_type = {"type": "array", "items": element_type}
+  else:
+    raise ValueError("Unkown BigQuery field mode: {}".format(field_mode))
+
+  avro_field = {"type": field_type, "name": table_field["name"]}
+
+  doc = table_field.get("description")
+  if doc:
+    avro_field["doc"] = doc
+
+  return avro_field
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py
new file mode 100644
index 0000000..bfb1de1
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+
+import json
+import logging
+import unittest
+
+import avro.schema
+import fastavro
+
+from apache_beam.io.gcp import bigquery_avro_tools
+from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToAvroSchema(unittest.TestCase):
+  def test_convert_bigquery_schema_to_avro_schema(self):
+    subfields = [
+        bigquery.TableFieldSchema(
+            name="species", type="STRING", mode="NULLABLE"),
+    ]
+
+    fields = [
+      bigquery.TableFieldSchema(
+        name="number", type="INTEGER", mode="REQUIRED"),
+      bigquery.TableFieldSchema(
+        name="species", type="STRING", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="quality", type="FLOAT"),  # default to NULLABLE
+      bigquery.TableFieldSchema(
+        name="quantity", type="INTEGER"),  # default to NULLABLE
+      bigquery.TableFieldSchema(
+        name="birthday", type="TIMESTAMP", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="birthdayMoney", type="NUMERIC", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="flighted", type="BOOLEAN", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="sound", type="BYTES", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="anniversaryDate", type="DATE", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="anniversaryDatetime", type="DATETIME", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="anniversaryTime", type="TIME", mode="NULLABLE"),
+      bigquery.TableFieldSchema(
+        name="scion", type="RECORD", mode="NULLABLE", fields=subfields),
+      bigquery.TableFieldSchema(
+        name="associates", type="RECORD", mode="REPEATED", fields=subfields),
+      bigquery.TableFieldSchema(
+        name="geoPositions", type="GEOGRAPHY", mode="NULLABLE"),
+    ]
+
+    table_schema = bigquery.TableSchema(fields=fields)
+    avro_schema = bigquery_avro_tools.get_record_schema_from_dict_table_schema(
+        "root", bigquery_tools.get_dict_table_schema(table_schema))
+
+    # Test that schema can be parsed correctly by fastavro
+    fastavro.parse_schema(avro_schema)
+
+    # Test that schema can be parsed correctly by avro
+    parsed_schema = avro.schema.parse(json.dumps(avro_schema))
+    # Avro RecordSchema provides field_map in py3 and fields_dict in py2
+    field_map = getattr(parsed_schema, "field_map", None) or \
+      getattr(parsed_schema, "fields_dict", None)
+
+    self.assertEqual(
+        field_map["number"].type, avro.schema.parse(json.dumps("long")))
+    self.assertEqual(
+        field_map["species"].type,
+        avro.schema.parse(json.dumps(["null", "string"])))
+    self.assertEqual(
+        field_map["quality"].type,
+        avro.schema.parse(json.dumps(["null", "double"])))
+    self.assertEqual(
+        field_map["quantity"].type,
+        avro.schema.parse(json.dumps(["null", "long"])))
+    self.assertEqual(
+        field_map["birthday"].type,
+        avro.schema.parse(
+            json.dumps(
+                ["null", {
+                    "type": "long", "logicalType": "timestamp-micros"
+                }])))
+    self.assertEqual(
+        field_map["birthdayMoney"].type,
+        avro.schema.parse(
+            json.dumps([
+                "null",
+                {
+                    "type": "bytes",
+                    "logicalType": "decimal",
+                    "precision": 38,
+                    "scale": 9
+                }
+            ])))
+    self.assertEqual(
+        field_map["flighted"].type,
+        avro.schema.parse(json.dumps(["null", "boolean"])))
+    self.assertEqual(
+        field_map["sound"].type,
+        avro.schema.parse(json.dumps(["null", "bytes"])))
+    self.assertEqual(
+        field_map["anniversaryDate"].type,
+        avro.schema.parse(
+            json.dumps(["null", {
+                "type": "int", "logicalType": "date"
+            }])))
+    self.assertEqual(
+        field_map["anniversaryDatetime"].type,
+        avro.schema.parse(json.dumps(["null", "string"])))
+    self.assertEqual(
+        field_map["anniversaryTime"].type,
+        avro.schema.parse(
+            json.dumps(["null", {
+                "type": "long", "logicalType": "time-micros"
+            }])))
+    self.assertEqual(
+        field_map["geoPositions"].type,
+        avro.schema.parse(json.dumps(["null", "string"])))
+
+    self.assertEqual(
+        field_map["scion"].type,
+        avro.schema.parse(
+            json.dumps([
+                "null",
+                {
+                    "type": "record",
+                    "name": "scion",
+                    "fields": [
+                        {
+                            "type": ["null", "string"],
+                            "name": "species",
+                        },
+                    ],
+                    "doc": "Translated Avro Schema for scion",
+                    "namespace": "apache_beam.io.gcp.bigquery.root.scion",
+                },
+            ])))
+
+    self.assertEqual(
+        field_map["associates"].type,
+        avro.schema.parse(
+            json.dumps({
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "name": "associates",
+                    "fields": [
+                        {
+                            "type": ["null", "string"],
+                            "name": "species",
+                        },
+                    ],
+                    "doc": "Translated Avro Schema for associates",
+                    "namespace": "apache_beam.io.gcp.bigquery.root.associates",
+                }
+            })))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 04a4928..097f42d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -99,7 +99,12 @@ def file_prefix_generator(
   return _generate_file_prefix
 
 
-def _make_new_file_writer(file_prefix, destination):
+def _make_new_file_writer(
+    file_prefix,
+    destination,
+    file_format,
+    schema=None,
+    schema_side_inputs=tuple()):
   destination = bigquery_tools.get_hashable_destination(destination)
 
   # Windows does not allow : on filenames. Replacing with underscore.
@@ -115,7 +120,23 @@ def _make_new_file_writer(file_prefix, destination):
   file_name = str(uuid.uuid4())
   file_path = fs.FileSystems.join(file_prefix, destination, file_name)
 
-  return file_path, fs.FileSystems.create(file_path, 'application/text')
+  if file_format == bigquery_tools.FileFormat.AVRO:
+    if callable(schema):
+      schema = schema(destination, *schema_side_inputs)
+    elif isinstance(schema, vp.ValueProvider):
+      schema = schema.get()
+
+    writer = bigquery_tools.AvroRowWriter(
+        fs.FileSystems.create(file_path, "application/avro"), schema)
+  elif file_format == bigquery_tools.FileFormat.JSON:
+    writer = bigquery_tools.JsonRowWriter(
+        fs.FileSystems.create(file_path, "application/text"))
+  else:
+    raise ValueError((
+        'Only AVRO and JSON are supported as intermediate formats for '
+        'BigQuery WriteRecordsToFile, got: {}.').format(file_format))
+
+  return file_path, writer
 
 
 def _bq_uuid(seed=None):
@@ -166,9 +187,10 @@ class WriteRecordsToFile(beam.DoFn):
 
   def __init__(
       self,
+      schema,
       max_files_per_bundle=_DEFAULT_MAX_WRITERS_PER_BUNDLE,
       max_file_size=_DEFAULT_MAX_FILE_SIZE,
-      coder=None):
+      file_format=None):
     """Initialize a :class:`WriteRecordsToFile`.
 
     Args:
@@ -179,21 +201,22 @@ class WriteRecordsToFile(beam.DoFn):
         an export job.
 
     """
+    self.schema = schema
     self.max_files_per_bundle = max_files_per_bundle
     self.max_file_size = max_file_size
-    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+    self.file_format = file_format or bigquery_tools.FileFormat.AVRO
 
   def display_data(self):
     return {
         'max_files_per_bundle': self.max_files_per_bundle,
         'max_file_size': str(self.max_file_size),
-        'coder': self.coder.__class__.__name__
+        'file_format': self.file_format,
     }
 
   def start_bundle(self):
     self._destination_to_file_writer = {}
 
-  def process(self, element, file_prefix):
+  def process(self, element, file_prefix, *schema_side_inputs):
     """Take a tuple with (destination, row) and write to file or spill out.
 
     Destination may be a ``TableReference`` or a string, and row is a
@@ -204,7 +227,11 @@ class WriteRecordsToFile(beam.DoFn):
     if destination not in self._destination_to_file_writer:
       if len(self._destination_to_file_writer) < self.max_files_per_bundle:
         self._destination_to_file_writer[destination] = _make_new_file_writer(
-            file_prefix, destination)
+            file_prefix,
+            destination,
+            self.file_format,
+            self.schema,
+            schema_side_inputs)
       else:
         yield pvalue.TaggedOutput(
             WriteRecordsToFile.UNWRITTEN_RECORD_TAG, element)
@@ -213,8 +240,7 @@ class WriteRecordsToFile(beam.DoFn):
     (file_path, writer) = self._destination_to_file_writer[destination]
 
     # TODO(pabloem): Is it possible for this to throw exception?
-    writer.write(self.coder.encode(row))
-    writer.write(b'\n')
+    writer.write(row)
 
     file_size = writer.tell()
     if file_size > self.max_file_size:
@@ -246,11 +272,13 @@ class WriteGroupedRecordsToFile(beam.DoFn):
 
   Experimental; no backwards compatibility guarantees.
   """
-  def __init__(self, max_file_size=_DEFAULT_MAX_FILE_SIZE, coder=None):
+  def __init__(
+      self, schema, max_file_size=_DEFAULT_MAX_FILE_SIZE, file_format=None):
+    self.schema = schema
     self.max_file_size = max_file_size
-    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+    self.file_format = file_format or bigquery_tools.FileFormat.AVRO
 
-  def process(self, element, file_prefix):
+  def process(self, element, file_prefix, *schema_side_inputs):
     destination = element[0]
     rows = element[1]
 
@@ -258,10 +286,14 @@ class WriteGroupedRecordsToFile(beam.DoFn):
 
     for row in rows:
       if writer is None:
-        (file_path, writer) = _make_new_file_writer(file_prefix, destination)
+        (file_path, writer) = _make_new_file_writer(
+            file_prefix,
+            destination,
+            self.file_format,
+            self.schema,
+            schema_side_inputs)
 
-      writer.write(self.coder.encode(row))
-      writer.write(b'\n')
+      writer.write(row)
 
       file_size = writer.tell()
       if file_size > self.max_file_size:
@@ -375,11 +407,13 @@ class TriggerLoadJobs(beam.DoFn):
       write_disposition=None,
       test_client=None,
       temporary_tables=False,
-      additional_bq_parameters=None):
+      additional_bq_parameters=None,
+      source_format=None):
     self.schema = schema
     self.test_client = test_client
     self.temporary_tables = temporary_tables
     self.additional_bq_parameters = additional_bq_parameters or {}
+    self.source_format = source_format
     if self.temporary_tables:
       # If we are loading into temporary tables, we rely on the default create
       # and write dispositions, which mean that a new table will be created.
@@ -464,7 +498,8 @@ class TriggerLoadJobs(beam.DoFn):
         schema=schema,
         write_disposition=self.write_disposition,
         create_disposition=create_disposition,
-        additional_load_parameters=additional_parameters)
+        additional_load_parameters=additional_parameters,
+        source_format=self.source_format)
     yield (destination, job_reference)
 
 
@@ -578,7 +613,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
       create_disposition=None,
       write_disposition=None,
       triggering_frequency=None,
-      coder=None,
+      temp_file_format=None,
       max_file_size=None,
       max_files_per_bundle=None,
       max_partition_size=None,
@@ -610,7 +645,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
 
     self.test_client = test_client
     self.schema = schema
-    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+    self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.AVRO
 
     # If we have multiple destinations, then we will have multiple load jobs,
     # thus we will need temporary tables for atomicity.
@@ -674,10 +709,12 @@ class BigQueryBatchFileLoads(beam.PTransform):
         destination_data_kv_pc
         | beam.ParDo(
             WriteRecordsToFile(
+                schema=self.schema,
                 max_files_per_bundle=self.max_files_per_bundle,
                 max_file_size=self.max_file_size,
-                coder=self.coder),
-            file_prefix=file_prefix_pcv).with_outputs(
+                file_format=self._temp_file_format),
+            file_prefix_pcv,
+            *self.schema_side_inputs).with_outputs(
                 WriteRecordsToFile.UNWRITTEN_RECORD_TAG,
                 WriteRecordsToFile.WRITTEN_FILE_TAG))
 
@@ -697,8 +734,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
         | "GroupShardedRows" >> beam.GroupByKey()
         | "DropShardNumber" >> beam.Map(lambda x: (x[0][0], x[1]))
         | "WriteGroupedRecordsToFile" >> beam.ParDo(
-            WriteGroupedRecordsToFile(coder=self.coder),
-            file_prefix=file_prefix_pcv))
+            WriteGroupedRecordsToFile(
+                schema=self.schema, file_format=self._temp_file_format),
+            file_prefix_pcv,
+            *self.schema_side_inputs))
 
     all_destination_file_pairs_pc = (
         (destination_files_kv_pc, more_destination_files_kv_pc)
@@ -748,7 +787,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
                 create_disposition=self.create_disposition,
                 test_client=self.test_client,
                 temporary_tables=True,
-                additional_bq_parameters=self.additional_bq_parameters),
+                additional_bq_parameters=self.additional_bq_parameters,
+                source_format=self._temp_file_format),
             load_job_name_pcv,
             *self.schema_side_inputs).with_outputs(
                 TriggerLoadJobs.TEMP_TABLES, main='main'))
@@ -796,7 +836,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
                 create_disposition=self.create_disposition,
                 test_client=self.test_client,
                 temporary_tables=False,
-                additional_bq_parameters=self.additional_bq_parameters),
+                additional_bq_parameters=self.additional_bq_parameters,
+                source_format=self._temp_file_format),
             load_job_name_pcv,
             *self.schema_side_inputs))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 7be618d..29fdfa5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -21,7 +21,6 @@
 
 from __future__ import absolute_import
 
-import json
 import logging
 import os
 import random
@@ -34,9 +33,10 @@ from hamcrest.core import assert_that as hamcrest_assert
 from hamcrest.core.core.allof import all_of
 from hamcrest.core.core.is_ import is_
 from nose.plugins.attrib import attr
+from parameterized import param
+from parameterized import parameterized
 
 import apache_beam as beam
-from apache_beam import coders
 from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
 from apache_beam.io.gcp import bigquery_file_loads as bqfl
 from apache_beam.io.gcp import bigquery
@@ -57,59 +57,68 @@ from apache_beam.typehints.typehints import Tuple
 try:
   from apitools.base.py.exceptions import HttpError
 except ImportError:
-  HttpError = None
+  raise unittest.SkipTest('GCP dependencies are not installed')
 
 _LOGGER = logging.getLogger(__name__)
 
 _DESTINATION_ELEMENT_PAIRS = [
     # DESTINATION 1
-    ('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
-    ('project1:dataset1.table1', '{"name":"beam", "language":"java"}'),
-    ('project1:dataset1.table1', '{"name":"beam", "language":"go"}'),
-    ('project1:dataset1.table1', '{"name":"flink", "language":"java"}'),
-    ('project1:dataset1.table1', '{"name":"flink", "language":"scala"}'),
+    ('project1:dataset1.table1', {
+        'name': 'beam', 'language': 'py'
+    }),
+    ('project1:dataset1.table1', {
+        'name': 'beam', 'language': 'java'
+    }),
+    ('project1:dataset1.table1', {
+        'name': 'beam', 'language': 'go'
+    }),
+    ('project1:dataset1.table1', {
+        'name': 'flink', 'language': 'java'
+    }),
+    ('project1:dataset1.table1', {
+        'name': 'flink', 'language': 'scala'
+    }),
 
     # DESTINATION 3
-    ('project1:dataset1.table3', '{"name":"spark", "language":"scala"}'),
+    ('project1:dataset1.table3', {
+        'name': 'spark', 'language': 'scala'
+    }),
 
     # DESTINATION 1
-    ('project1:dataset1.table1', '{"name":"spark", "language":"py"}'),
-    ('project1:dataset1.table1', '{"name":"spark", "language":"scala"}'),
+    ('project1:dataset1.table1', {
+        'name': 'spark', 'language': 'py'
+    }),
+    ('project1:dataset1.table1', {
+        'name': 'spark', 'language': 'scala'
+    }),
 
     # DESTINATION 2
-    ('project1:dataset1.table2', '{"name":"beam", "foundation":"apache"}'),
-    ('project1:dataset1.table2', '{"name":"flink", "foundation":"apache"}'),
-    ('project1:dataset1.table2', '{"name":"spark", "foundation":"apache"}'),
-]
-
-_NAME_LANGUAGE_ELEMENTS = [
-    json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS
-    if "language" in elm[1]
+    ('project1:dataset1.table2', {
+        'name': 'beam', 'foundation': 'apache'
+    }),
+    ('project1:dataset1.table2', {
+        'name': 'flink', 'foundation': 'apache'
+    }),
+    ('project1:dataset1.table2', {
+        'name': 'spark', 'foundation': 'apache'
+    }),
 ]
 
 _DISTINCT_DESTINATIONS = list(
     set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS]))
 
-_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
-
+_ELEMENTS = [elm[1] for elm in _DESTINATION_ELEMENT_PAIRS]
 
-class CustomRowCoder(coders.Coder):
-  """
-  Custom row coder that also expects strings as input data when encoding
-  """
-  def __init__(self):
-    self.coder = bigquery_tools.RowAsDictJsonCoder()
+_ELEMENTS_SCHEMA = bigquery.WriteToBigQuery.get_dict_table_schema(
+    bigquery_api.TableSchema(
+        fields=[
+            bigquery_api.TableFieldSchema(
+                name="name", type="STRING", mode="REQUIRED"),
+            bigquery_api.TableFieldSchema(name="language", type="STRING"),
+            bigquery_api.TableFieldSchema(name="foundation", type="STRING"),
+        ]))
 
-  def encode(self, table_row):
-    if type(table_row) == str:
-      table_row = json.loads(table_row)
-    return self.coder.encode(table_row)
 
-  def decode(self, encoded_table_row):
-    return self.coder.decode(encoded_table_row)
-
-
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
   maxDiff = None
 
@@ -127,10 +136,16 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
       checks(output_pcs)
       return output_pcs
 
-  def test_files_created(self):
+  @parameterized.expand([
+      param(file_format=bigquery_tools.FileFormat.AVRO),
+      param(file_format=bigquery_tools.FileFormat.JSON),
+      param(file_format=None),
+  ])
+  def test_files_created(self, file_format):
     """Test that the files are created and written."""
 
-    fn = bqfl.WriteRecordsToFile(coder=CustomRowCoder())
+    fn = bqfl.WriteRecordsToFile(
+        schema=_ELEMENTS_SCHEMA, file_format=file_format)
     self.tmpdir = self._new_tempdir()
 
     def check_files_created(output_pcs):
@@ -161,7 +176,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
     file length is very small, so only a couple records fit in each file.
     """
 
-    fn = bqfl.WriteRecordsToFile(max_file_size=50, coder=CustomRowCoder())
+    fn = bqfl.WriteRecordsToFile(schema=_ELEMENTS_SCHEMA, max_file_size=300)
     self.tmpdir = self._new_tempdir()
 
     def check_many_files(output_pcs):
@@ -188,7 +203,11 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
 
     self._consume_input(fn, check_many_files)
 
-  def test_records_are_spilled(self):
+  @parameterized.expand([
+      param(file_format=bigquery_tools.FileFormat.AVRO),
+      param(file_format=bigquery_tools.FileFormat.JSON),
+  ])
+  def test_records_are_spilled(self, file_format):
     """Forces records to be written to many files.
 
     For each destination multiple files are necessary, and at most two files
@@ -196,7 +215,10 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
     processing.
     """
 
-    fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2, coder=CustomRowCoder())
+    fn = bqfl.WriteRecordsToFile(
+        schema=_ELEMENTS_SCHEMA,
+        max_files_per_bundle=2,
+        file_format=file_format)
     self.tmpdir = self._new_tempdir()
 
     def check_many_files(output_pcs):
@@ -231,7 +253,6 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
     self._consume_input(fn, check_many_files)
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
   def _consume_input(self, fn, input, checks):
     if checks is None:
@@ -247,10 +268,16 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
       checks(res)
       return res
 
-  def test_files_are_created(self):
+  @parameterized.expand([
+      param(file_format=bigquery_tools.FileFormat.AVRO),
+      param(file_format=bigquery_tools.FileFormat.JSON),
+      param(file_format=None),
+  ])
+  def test_files_are_created(self, file_format):
     """Test that the files are created and written."""
 
-    fn = bqfl.WriteGroupedRecordsToFile(coder=CustomRowCoder())
+    fn = bqfl.WriteGroupedRecordsToFile(
+        schema=_ELEMENTS_SCHEMA, file_format=file_format)
     self.tmpdir = self._new_tempdir()
 
     def check_files_created(output_pc):
@@ -279,7 +306,7 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
     file length is very small, so only a couple records fit in each file.
     """
     fn = bqfl.WriteGroupedRecordsToFile(
-        max_file_size=50, coder=CustomRowCoder())
+        schema=_ELEMENTS_SCHEMA, max_file_size=300)
     self.tmpdir = self._new_tempdir()
 
     def check_multiple_files(output_pc):
@@ -302,7 +329,6 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
     self._consume_input(fn, _DESTINATION_ELEMENT_PAIRS, check_multiple_files)
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestPartitionFiles(unittest.TestCase):
 
   _ELEMENTS = [(
@@ -375,7 +401,6 @@ class TestPartitionFiles(unittest.TestCase):
         label='CheckSinglePartition')
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
   def test_records_traverse_transform_with_mocks(self):
     destination = 'project1:dataset1.table1'
@@ -401,7 +426,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
         custom_gcs_temp_location=self._new_tempdir(),
         test_client=bq_client,
         validate=False,
-        coder=CustomRowCoder())
+        temp_file_format=bigquery_tools.FileFormat.JSON)
 
     # Need to test this with the DirectRunner to avoid serializing mocks
     with TestPipeline('DirectRunner') as p:
@@ -534,7 +559,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
               custom_gcs_temp_location=self._new_tempdir(),
               test_client=bq_client,
               validate=False,
-              coder=CustomRowCoder(),
+              temp_file_format=bigquery_tools.FileFormat.JSON,
               max_file_size=45,
               max_partition_size=80,
               max_files_per_partition=2))
@@ -580,7 +605,6 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
           label='CheckCopyJobCount')
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class BigQueryFileLoadsIT(unittest.TestCase):
 
   BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
@@ -771,6 +795,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
         experiments='use_beam_bq_sink')
 
     with self.assertRaises(Exception):
+      # The pipeline below fails because neither a schema nor SCHEMA_AUTODETECT
+      # are specified.
       with beam.Pipeline(argv=args) as p:
         input = p | beam.Create(_ELEMENTS)
         input2 = p | "Broken record" >> beam.Create(['language_broken_record'])
@@ -783,7 +809,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
                 (output_table_1 if 'language' in x else output_table_2),
                 create_disposition=(
                     beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
-                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+                temp_file_format=bigquery_tools.FileFormat.JSON))
 
     hamcrest_assert(p, all_of(*pipeline_verifiers))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b716822..b105e5e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -20,6 +20,7 @@
 
 from __future__ import absolute_import
 
+import datetime
 import decimal
 import json
 import logging
@@ -35,6 +36,7 @@ import uuid
 import future.tests.base  # pylint: disable=unused-import
 import hamcrest as hc
 import mock
+import pytz
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
@@ -560,6 +562,28 @@ class TestWriteToBigQuery(unittest.TestCase):
         beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
     self.assertEqual(expected_dict_schema, dict_schema)
 
+  def test_schema_autodetect_not_allowed_with_avro_file_loads(self):
+    with TestPipeline(
+        additional_pipeline_args=["--experiments=use_beam_bq_sink"]) as p:
+      pc = p | beam.Impulse()
+
+      with self.assertRaisesRegex(ValueError, '^A schema must be provided'):
+        _ = (
+            pc
+            | beam.io.gcp.bigquery.WriteToBigQuery(
+                "dataset.table",
+                schema=None,
+                temp_file_format=bigquery_tools.FileFormat.AVRO))
+
+      with self.assertRaisesRegex(ValueError,
+                                  '^Schema auto-detection is not supported'):
+        _ = (
+            pc
+            | beam.io.gcp.bigquery.WriteToBigQuery(
+                "dataset.table",
+                schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
+                temp_file_format=bigquery_tools.FileFormat.AVRO))
+
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class BigQueryStreamingInsertTransformTests(unittest.TestCase):
@@ -816,7 +840,8 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
                   str, '%s:%s' % (self.project, output_table_2)),
               schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
               additional_bq_parameters=lambda _: additional_bq_parameters,
-              method='FILE_LOADS'))
+              method='FILE_LOADS',
+              temp_file_format=bigquery_tools.FileFormat.JSON))
 
   @attr('IT')
   def test_multiple_destinations_transform(self):
@@ -1025,6 +1050,104 @@ class PubSubBigQueryIT(unittest.TestCase):
         WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
 
 
+class BigQueryFileLoadsIntegrationTests(unittest.TestCase):
+  BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+
+    self.dataset_id = '%s%s%s' % (
+        self.BIG_QUERY_DATASET_ID,
+        str(int(time.time())),
+        random.randint(0, 10000))
+    self.bigquery_client = bigquery_tools.BigQueryWrapper()
+    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+    self.output_table = '%s.output_table' % (self.dataset_id)
+    self.table_ref = bigquery_tools.parse_table_reference(self.output_table)
+    _LOGGER.info(
+        'Created dataset %s in project %s', self.dataset_id, self.project)
+
+  @attr('IT')
+  def test_avro_file_load(self):
+    # Construct elements such that they can be written via Avro but not via
+    # JSON. See BEAM-8841.
+    elements = [
+        {
+            'name': u'Negative infinity',
+            'value': -float('inf'),
+            'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc),
+        },
+        {
+            'name': u'Not a number',
+            'value': float('nan'),
+            'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc),
+        },
+    ]
+
+    schema = beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(
+        bigquery.TableSchema(
+            fields=[
+                bigquery.TableFieldSchema(
+                    name='name', type='STRING', mode='REQUIRED'),
+                bigquery.TableFieldSchema(
+                    name='value', type='FLOAT', mode='REQUIRED'),
+                bigquery.TableFieldSchema(
+                    name='timestamp', type='TIMESTAMP', mode='REQUIRED'),
+            ]))
+
+    pipeline_verifiers = [
+        # Some gymnastics here to avoid comparing NaN since NaN is not equal to
+        # anything, including itself.
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT name, value, timestamp FROM {} WHERE value<0".format(
+                self.output_table),
+            data=[(d['name'], d['value'], d['timestamp'])
+                  for d in elements[:1]],
+        ),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT name, timestamp FROM {}".format(self.output_table),
+            data=[(d['name'], d['timestamp']) for d in elements],
+        ),
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers),
+        experiments='use_beam_bq_sink',
+    )
+
+    with beam.Pipeline(argv=args) as p:
+      input = p | 'CreateInput' >> beam.Create(elements)
+      schema_pc = p | 'CreateSchema' >> beam.Create([schema])
+
+      _ = (
+          input
+          | 'WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
+              table='%s:%s' % (self.project, self.output_table),
+              schema=lambda _,
+              schema: schema,
+              schema_side_inputs=(beam.pvalue.AsSingleton(schema_pc), ),
+              method='FILE_LOADS',
+              temp_file_format=bigquery_tools.FileFormat.AVRO,
+          ))
+
+  def tearDown(self):
+    request = bigquery.BigqueryDatasetsDeleteRequest(
+        projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
+    try:
+      _LOGGER.info(
+          "Deleting dataset %s in project %s", self.dataset_id, self.project)
+      self.bigquery_client.client.datasets.Delete(request)
+    except HttpError:
+      _LOGGER.debug(
+          'Failed to clean up dataset %s in project %s',
+          self.dataset_id,
+          self.project)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index f9edfa3..1ae4f7c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -31,6 +31,7 @@ from __future__ import absolute_import
 
 import datetime
 import decimal
+import io
 import json
 import logging
 import re
@@ -39,13 +40,17 @@ import time
 import uuid
 from builtins import object
 
+import fastavro
 from future.utils import iteritems
+from future.utils import raise_with_traceback
+from past.builtins import unicode
 
 from apache_beam import coders
 from apache_beam.internal.gcp import auth
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp import bigquery_avro_tools
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options import value_provider
 from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -69,7 +74,7 @@ MAX_RETRIES = 3
 JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
 
 
-class ExportFileFormat(object):
+class FileFormat(object):
   CSV = 'CSV'
   JSON = 'NEWLINE_DELIMITED_JSON'
   AVRO = 'AVRO'
@@ -344,7 +349,8 @@ class BigQueryWrapper(object):
       schema=None,
       write_disposition=None,
       create_disposition=None,
-      additional_load_parameters=None):
+      additional_load_parameters=None,
+      source_format=None):
     additional_load_parameters = additional_load_parameters or {}
     job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
@@ -358,7 +364,8 @@ class BigQueryWrapper(object):
                     schema=job_schema,
                     writeDisposition=write_disposition,
                     createDisposition=create_disposition,
-                    sourceFormat='NEWLINE_DELIMITED_JSON',
+                    sourceFormat=source_format,
+                    useAvroLogicalTypes=True,
                     autodetect=schema == 'SCHEMA_AUTODETECT',
                     **additional_load_parameters)),
             jobReference=reference,
@@ -656,7 +663,8 @@ class BigQueryWrapper(object):
       schema=None,
       write_disposition=None,
       create_disposition=None,
-      additional_load_parameters=None):
+      additional_load_parameters=None,
+      source_format=None):
     """Starts a job to load data into BigQuery.
 
     Returns:
@@ -670,7 +678,8 @@ class BigQueryWrapper(object):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        additional_load_parameters=additional_load_parameters)
+        additional_load_parameters=additional_load_parameters,
+        source_format=source_format)
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
@@ -1171,6 +1180,104 @@ class RowAsDictJsonCoder(coders.Coder):
     return json.loads(encoded_table_row.decode('utf-8'))
 
 
+class JsonRowWriter(io.IOBase):
+  """
+  A writer which provides an IOBase-like interface for writing table rows
+  (represented as dicts) as newline-delimited JSON strings.
+  """
+  def __init__(self, file_handle):
+    """Initialize an JsonRowWriter.
+
+    Args:
+      file_handle (io.IOBase): Output stream to write to.
+    """
+    if not file_handle.writable():
+      raise ValueError("Output stream must be writable")
+
+    self._file_handle = file_handle
+    self._coder = RowAsDictJsonCoder()
+
+  def close(self):
+    self._file_handle.close()
+
+  @property
+  def closed(self):
+    return self._file_handle.closed
+
+  def flush(self):
+    self._file_handle.flush()
+
+  def read(self, size=-1):
+    raise io.UnsupportedOperation("JsonRowWriter is not readable")
+
+  def tell(self):
+    return self._file_handle.tell()
+
+  def writable(self):
+    return self._file_handle.writable()
+
+  def write(self, row):
+    return self._file_handle.write(self._coder.encode(row) + b'\n')
+
+
+class AvroRowWriter(io.IOBase):
+  """
+  A writer which provides an IOBase-like interface for writing table rows
+  (represented as dicts) as Avro records.
+  """
+  def __init__(self, file_handle, schema):
+    """Initialize an AvroRowWriter.
+
+    Args:
+      file_handle (io.IOBase): Output stream to write Avro records to.
+      schema (Dict[Text, Any]): BigQuery table schema.
+    """
+    if not file_handle.writable():
+      raise ValueError("Output stream must be writable")
+
+    self._file_handle = file_handle
+    avro_schema = fastavro.parse_schema(
+        get_avro_schema_from_table_schema(schema))
+    self._avro_writer = fastavro.write.Writer(self._file_handle, avro_schema)
+
+  def close(self):
+    if not self._file_handle.closed:
+      self.flush()
+      self._file_handle.close()
+
+  @property
+  def closed(self):
+    return self._file_handle.closed
+
+  def flush(self):
+    if self._file_handle.closed:
+      raise ValueError("flush on closed file")
+
+    self._avro_writer.flush()
+    self._file_handle.flush()
+
+  def read(self, size=-1):
+    raise io.UnsupportedOperation("AvroRowWriter is not readable")
+
+  def tell(self):
+    # Flush the fastavro Writer to the underlying stream, otherwise there isn't
+    # a reliable way to determine how many bytes have been written.
+    self._avro_writer.flush()
+    return self._file_handle.tell()
+
+  def writable(self):
+    return self._file_handle.writable()
+
+  def write(self, row):
+    try:
+      self._avro_writer.write(row)
+    except (TypeError, ValueError) as ex:
+      raise_with_traceback(
+          ex.__class__(
+              "Error writing row to Avro: {}\nSchema: {}\nRow: {}".format(
+                  ex, self._avro_writer.schema, row)))
+
+
 class RetryStrategy(object):
   RETRY_ALWAYS = 'RETRY_ALWAYS'
   RETRY_NEVER = 'RETRY_NEVER'
@@ -1221,3 +1328,97 @@ class AppendDestinationsFn(DoFn):
 
   def process(self, element, *side_inputs):
     yield (self.destination(element, *side_inputs), element)
+
+
+def get_table_schema_from_string(schema):
+  """Transform the string table schema into a
+  :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` instance.
+
+  Args:
+    schema (str): The sting schema to be used if the BigQuery table to write
+      has to be created.
+
+  Returns:
+    ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema:
+    The schema to be used if the BigQuery table to write has to be created
+    but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` format.
+  """
+  table_schema = bigquery.TableSchema()
+  schema_list = [s.strip() for s in schema.split(',')]
+  for field_and_type in schema_list:
+    field_name, field_type = field_and_type.split(':')
+    field_schema = bigquery.TableFieldSchema()
+    field_schema.name = field_name
+    field_schema.type = field_type
+    field_schema.mode = 'NULLABLE'
+    table_schema.fields.append(field_schema)
+  return table_schema
+
+
+def table_schema_to_dict(table_schema):
+  """Create a dictionary representation of table schema for serialization
+  """
+  def get_table_field(field):
+    """Create a dictionary representation of a table field
+    """
+    result = {}
+    result['name'] = field.name
+    result['type'] = field.type
+    result['mode'] = getattr(field, 'mode', 'NULLABLE')
+    if hasattr(field, 'description') and field.description is not None:
+      result['description'] = field.description
+    if hasattr(field, 'fields') and field.fields:
+      result['fields'] = [get_table_field(f) for f in field.fields]
+    return result
+
+  if not isinstance(table_schema, bigquery.TableSchema):
+    raise ValueError("Table schema must be of the type bigquery.TableSchema")
+  schema = {'fields': []}
+  for field in table_schema.fields:
+    schema['fields'].append(get_table_field(field))
+  return schema
+
+
+def get_dict_table_schema(schema):
+  """Transform the table schema into a dictionary instance.
+
+  Args:
+    schema (str, dict, ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+      The schema to be used if the BigQuery table to write has to be created.
+      This can either be a dict or string or in the TableSchema format.
+
+  Returns:
+    Dict[str, Any]: The schema to be used if the BigQuery table to write has
+    to be created but in the dictionary format.
+  """
+  if (isinstance(schema, (dict, value_provider.ValueProvider)) or
+      callable(schema) or schema is None):
+    return schema
+  elif isinstance(schema, (str, unicode)):
+    table_schema = get_table_schema_from_string(schema)
+    return table_schema_to_dict(table_schema)
+  elif isinstance(schema, bigquery.TableSchema):
+    return table_schema_to_dict(schema)
+  else:
+    raise TypeError('Unexpected schema argument: %s.' % schema)
+
+
+def get_avro_schema_from_table_schema(schema):
+  """Transform the table schema into an Avro schema.
+
+  Args:
+    schema (str, dict, ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+      The TableSchema to convert to Avro schema. This can either be a dict or
+      string or in the TableSchema format.
+
+  Returns:
+    Dict[str, Any]: An Avro schema, which can be used by fastavro.
+  """
+  dict_table_schema = get_dict_table_schema(schema)
+  return bigquery_avro_tools.get_record_schema_from_dict_table_schema(
+      "root", dict_table_schema)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 5f71434..9c874fa 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -21,15 +21,19 @@ from __future__ import absolute_import
 
 import datetime
 import decimal
+import io
 import json
 import logging
+import math
 import re
 import time
 import unittest
 
+import fastavro
 # patches unittest.TestCase to be python3 compatible
 import future.tests.base  # pylint: disable=unused-import,ungrouped-imports
 import mock
+import pytz
 from future.utils import iteritems
 
 import apache_beam as beam
@@ -37,6 +41,8 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.gcp.bigquery import TableRowJsonCoder
 from apache_beam.io.gcp.bigquery_test import HttpError
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
+from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
+from apache_beam.io.gcp.bigquery_tools import JsonRowWriter
 from apache_beam.io.gcp.bigquery_tools import RowAsDictJsonCoder
 from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
 from apache_beam.io.gcp.internal.clients import bigquery
@@ -745,6 +751,67 @@ class TestRowAsDictJsonCoder(unittest.TestCase):
     self.json_compliance_exception(float('-inf'))
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestJsonRowWriter(unittest.TestCase):
+  def test_write_row(self):
+    rows = [
+        {
+            'name': 'beam', 'game': 'dream'
+        },
+        {
+            'name': 'team', 'game': 'cream'
+        },
+    ]
+
+    with io.BytesIO() as buf:
+      # Mock close() so we can access the buffer contents
+      # after JsonRowWriter is closed.
+      with mock.patch.object(buf, 'close') as mock_close:
+        writer = JsonRowWriter(buf)
+        for row in rows:
+          writer.write(row)
+        writer.close()
+
+        mock_close.assert_called_once()
+
+      buf.seek(0)
+      read_rows = [
+          json.loads(row)
+          for row in buf.getvalue().strip().decode('utf-8').split('\n')
+      ]
+
+    self.assertEqual(read_rows, rows)
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestAvroRowWriter(unittest.TestCase):
+  def test_write_row(self):
+    schema = bigquery.TableSchema(
+        fields=[
+            bigquery.TableFieldSchema(name='stamp', type='TIMESTAMP'),
+            bigquery.TableFieldSchema(
+                name='number', type='FLOAT', mode='REQUIRED'),
+        ])
+    stamp = datetime.datetime(2020, 2, 25, 12, 0, 0, tzinfo=pytz.utc)
+
+    with io.BytesIO() as buf:
+      # Mock close() so we can access the buffer contents
+      # after AvroRowWriter is closed.
+      with mock.patch.object(buf, 'close') as mock_close:
+        writer = AvroRowWriter(buf, schema)
+        writer.write({'stamp': stamp, 'number': float('NaN')})
+        writer.close()
+
+        mock_close.assert_called_once()
+
+      buf.seek(0)
+      records = [r for r in fastavro.reader(buf)]
+
+    self.assertEqual(len(records), 1)
+    self.assertTrue(math.isnan(records[0]['number']))
+    self.assertEqual(records[0]['stamp'], stamp)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 14009f7..9ea75ea 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -37,6 +37,7 @@ from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.bigquery_tools import FileFormat
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -208,7 +209,8 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
               method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
               schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-              write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+              write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+              temp_file_format=FileFormat.JSON))
 
   @attr('IT')
   def test_big_query_write_new_types(self):
@@ -350,7 +352,8 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
           p | 'create' >> beam.Create(input_data)
           | 'write' >> beam.io.WriteToBigQuery(
               table_id,
-              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+              temp_file_format=FileFormat.JSON))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index c248df4..02c96cb 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -21,6 +21,7 @@
 
 from __future__ import absolute_import
 
+import io
 import os
 import shutil
 from builtins import zip
@@ -139,7 +140,7 @@ class LocalFileSystem(FileSystem):
     """Helper functions to open a file in the provided mode.
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
-    raw_file = open(path, mode)
+    raw_file = io.open(path, mode)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
     else:
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index f41b30e..6d6df82 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -140,6 +140,7 @@ ignore_identifiers = [
   'Iterable',
   'List',
   'Set',
+  'Text',
   'Tuple',
 
   # Ignore broken built-in type references
diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh
index cb0ce31..386bb42 100755
--- a/sdks/python/scripts/run_integration_test.sh
+++ b/sdks/python/scripts/run_integration_test.sh
@@ -203,6 +203,7 @@ if [[ -z $PIPELINE_OPTS ]]; then
   # See: https://github.com/hamcrest/PyHamcrest/issues/131.
   echo "pyhamcrest!=1.10.0,<2.0.0" > postcommit_requirements.txt
   echo "mock<3.0.0" >> postcommit_requirements.txt
+  echo "parameterized>=0.7.1,<0.8.0" >> postcommit_requirements.txt
 
   # Options used to run testing pipeline on Cloud Dataflow Service. Also used for
   # running on DirectRunner (some options ignored).