You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/11/21 21:32:20 UTC

incubator-airflow git commit: [AIRFLOW-638] Add schema_update_options to GCP ops

Repository: incubator-airflow
Updated Branches:
  refs/heads/master dedc54eea -> beb285205


[AIRFLOW-638] Add schema_update_options to GCP ops

Closes #1891 from
Jalepeno112/feature/gcs_to_bq_schemaUpdateOptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/beb28520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/beb28520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/beb28520

Branch: refs/heads/master
Commit: beb285205780d230f6862fad56e609bb81996c01
Parents: dedc54e
Author: Giovanni Briggs <gb...@gmail.com>
Authored: Mon Nov 21 13:31:57 2016 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Nov 21 13:32:08 2016 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py | 39 ++++++++++++++++++++++++++---
 airflow/contrib/operators/gcs_to_bq.py |  9 ++++++-
 tests/contrib/hooks/bigquery_hook.py   | 25 +++++++++++++++++-
 3 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 700a39e..b76126a 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -341,7 +341,8 @@ class BigQueryBaseCursor(object):
                  create_disposition='CREATE_IF_NEEDED',
                  skip_leading_rows=0,
                  write_disposition='WRITE_EMPTY',
-                 field_delimiter=','):
+                 field_delimiter=',',
+                 schema_update_options=[]):
         """
         Executes a BigQuery load command to load data from Google Cloud Storage
         to BigQuery. See here:
@@ -372,20 +373,38 @@ class BigQueryBaseCursor(object):
         :type write_disposition: string
         :param field_delimiter: The delimiter to use when loading from a CSV.
         :type field_delimiter: string
+        :param schema_update_options: Allows the schema of the desitination
+            table to be updated as a side effect of the load job.
+        :type schema_update_options: list
         """
 
         # bigquery only allows certain source formats
         # we check to make sure the passed source format is valid
         # if it's not, we raise a ValueError
-        # Refer to this link for more details: 
+        # Refer to this link for more details:
         #   https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
         source_format = source_format.upper()
         allowed_formats = ["CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS"]
         if source_format not in allowed_formats:
-            raise ValueError("{0} is not a valid source format. " 
+            raise ValueError("{0} is not a valid source format. "
                     "Please use one of the following types: {1}"
                     .format(source_format, allowed_formats))
 
+        # bigquery also allows you to define how you want a table's schema to change
+        # as a side effect of a load
+        # for more details:
+        #   https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
+        allowed_schema_update_options = [
+            'ALLOW_FIELD_ADDITION',
+            "ALLOW_FIELD_RELAXATION"
+        ]
+        if not set(allowed_schema_update_options).issuperset(set(schema_update_options)):
+            raise ValueError(
+                "{0} contains invalid schema update options. "
+                "Please only use one or more of the following options: {1}"
+                .format(schema_update_options, allowed_schema_update_options)
+            )
+
         destination_project, destination_dataset, destination_table = \
             _split_tablename(table_input=destination_project_dataset_table,
                              default_project_id=self.project_id,
@@ -408,6 +427,20 @@ class BigQueryBaseCursor(object):
             }
         }
 
+        if schema_update_options:
+            if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
+                raise ValueError(
+                    "schema_update_options is only "
+                    "allowed if write_disposition is "
+                    "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
+                )
+            else:
+                logging.info(
+                    "Adding experimental "
+                    "'schemaUpdateOptions': {0}".format(schema_update_options)
+                )
+                configuration['load']['schemaUpdateOptions'] = schema_update_options
+
         if source_format == 'CSV':
             configuration['load']['skipLeadingRows'] = skip_leading_rows
             configuration['load']['fieldDelimiter'] = field_delimiter

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 25b338d..26d996a 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -46,6 +46,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         bigquery_conn_id='bigquery_default',
         google_cloud_storage_conn_id='google_cloud_storage_default',
         delegate_to=None,
+        schema_update_options=[],
         *args,
         **kwargs):
         """
@@ -92,6 +93,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
             work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
+        :param schema_update_options: Allows the schema of the desitination 
+            table to be updated as a side effect of the load job.
+        :type schema_update_options: list
         """
         super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
 
@@ -114,6 +118,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.delegate_to = delegate_to
 
+        self.schema_update_options = schema_update_options
+
     def execute(self, context):
         gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                                           delegate_to=self.delegate_to)
@@ -132,7 +138,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
             create_disposition=self.create_disposition,
             skip_leading_rows=self.skip_leading_rows,
             write_disposition=self.write_disposition,
-            field_delimiter=self.field_delimiter)
+            field_delimiter=self.field_delimiter,
+            schema_update_options=self.schema_update_options)
 
         if self.max_id_key:
             cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_project_dataset_table))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/tests/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/bigquery_hook.py b/tests/contrib/hooks/bigquery_hook.py
index 3a58766..68856f8 100644
--- a/tests/contrib/hooks/bigquery_hook.py
+++ b/tests/contrib/hooks/bigquery_hook.py
@@ -108,9 +108,32 @@ class TestBigQueryHookSourceFormat(unittest.TestCase):
     def test_invalid_source_format(self):
         with self.assertRaises(Exception) as context:
             hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json")
-        
+
         # since we passed 'json' in, and it's not valid, make sure it's present in the error string.
         self.assertIn("json", str(context.exception))
 
+
+class TestBigQueryBaseCursor(unittest.TestCase):
+    def test_invalid_schema_update_options(self):
+        with self.assertRaises(Exception) as context:
+            hook.BigQueryBaseCursor("test", "test").run_load(
+                "test.test",
+                "test_schema.json",
+                ["test_data.json"],
+                schema_update_options=["THIS IS NOT VALID"]
+                )
+        self.assertIn("THIS IS NOT VALID", str(context.exception))
+
+    def test_invalid_schema_update_and_write_disposition(self):
+        with self.assertRaises(Exception) as context:
+            hook.BigQueryBaseCursor("test", "test").run_load(
+                "test.test",
+                "test_schema.json",
+                ["test_data.json"],
+                schema_update_options=['ALLOW_FIELD_ADDITION'],
+                write_disposition='WRITE_EMPTY'
+            )
+        self.assertIn("schema_update_options is only", str(context.exception))
+
 if __name__ == '__main__':
     unittest.main()