You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/11/21 21:32:20 UTC
incubator-airflow git commit: [AIRFLOW-638] Add schema_update_options
to GCP ops
Repository: incubator-airflow
Updated Branches:
refs/heads/master dedc54eea -> beb285205
[AIRFLOW-638] Add schema_update_options to GCP ops
Closes #1891 from
Jalepeno112/feature/gcs_to_bq_schemaUpdateOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/beb28520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/beb28520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/beb28520
Branch: refs/heads/master
Commit: beb285205780d230f6862fad56e609bb81996c01
Parents: dedc54e
Author: Giovanni Briggs <gb...@gmail.com>
Authored: Mon Nov 21 13:31:57 2016 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Nov 21 13:32:08 2016 -0800
----------------------------------------------------------------------
airflow/contrib/hooks/bigquery_hook.py | 39 ++++++++++++++++++++++++++---
airflow/contrib/operators/gcs_to_bq.py | 9 ++++++-
tests/contrib/hooks/bigquery_hook.py | 25 +++++++++++++++++-
3 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 700a39e..b76126a 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -341,7 +341,8 @@ class BigQueryBaseCursor(object):
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
- field_delimiter=','):
+ field_delimiter=',',
+ schema_update_options=[]):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
@@ -372,20 +373,38 @@ class BigQueryBaseCursor(object):
:type write_disposition: string
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
+ :param schema_update_options: Allows the schema of the desitination
+ table to be updated as a side effect of the load job.
+ :type schema_update_options: list
"""
# bigquery only allows certain source formats
# we check to make sure the passed source format is valid
# if it's not, we raise a ValueError
- # Refer to this link for more details:
+ # Refer to this link for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
source_format = source_format.upper()
allowed_formats = ["CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS"]
if source_format not in allowed_formats:
- raise ValueError("{0} is not a valid source format. "
+ raise ValueError("{0} is not a valid source format. "
"Please use one of the following types: {1}"
.format(source_format, allowed_formats))
+ # bigquery also allows you to define how you want a table's schema to change
+ # as a side effect of a load
+ # for more details:
+ # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
+ allowed_schema_update_options = [
+ 'ALLOW_FIELD_ADDITION',
+ "ALLOW_FIELD_RELAXATION"
+ ]
+ if not set(allowed_schema_update_options).issuperset(set(schema_update_options)):
+ raise ValueError(
+ "{0} contains invalid schema update options. "
+ "Please only use one or more of the following options: {1}"
+ .format(schema_update_options, allowed_schema_update_options)
+ )
+
destination_project, destination_dataset, destination_table = \
_split_tablename(table_input=destination_project_dataset_table,
default_project_id=self.project_id,
@@ -408,6 +427,20 @@ class BigQueryBaseCursor(object):
}
}
+ if schema_update_options:
+ if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
+ raise ValueError(
+ "schema_update_options is only "
+ "allowed if write_disposition is "
+ "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
+ )
+ else:
+ logging.info(
+ "Adding experimental "
+ "'schemaUpdateOptions': {0}".format(schema_update_options)
+ )
+ configuration['load']['schemaUpdateOptions'] = schema_update_options
+
if source_format == 'CSV':
configuration['load']['skipLeadingRows'] = skip_leading_rows
configuration['load']['fieldDelimiter'] = field_delimiter
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 25b338d..26d996a 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -46,6 +46,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None,
+ schema_update_options=[],
*args,
**kwargs):
"""
@@ -92,6 +93,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
+ :param schema_update_options: Allows the schema of the desitination
+ table to be updated as a side effect of the load job.
+ :type schema_update_options: list
"""
super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
@@ -114,6 +118,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
+ self.schema_update_options = schema_update_options
+
def execute(self, context):
gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
@@ -132,7 +138,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
create_disposition=self.create_disposition,
skip_leading_rows=self.skip_leading_rows,
write_disposition=self.write_disposition,
- field_delimiter=self.field_delimiter)
+ field_delimiter=self.field_delimiter,
+ schema_update_options=self.schema_update_options)
if self.max_id_key:
cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_project_dataset_table))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/tests/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/bigquery_hook.py b/tests/contrib/hooks/bigquery_hook.py
index 3a58766..68856f8 100644
--- a/tests/contrib/hooks/bigquery_hook.py
+++ b/tests/contrib/hooks/bigquery_hook.py
@@ -108,9 +108,32 @@ class TestBigQueryHookSourceFormat(unittest.TestCase):
def test_invalid_source_format(self):
with self.assertRaises(Exception) as context:
hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json")
-
+
# since we passed 'json' in, and it's not valid, make sure it's present in the error string.
self.assertIn("json", str(context.exception))
+
+class TestBigQueryBaseCursor(unittest.TestCase):
+ def test_invalid_schema_update_options(self):
+ with self.assertRaises(Exception) as context:
+ hook.BigQueryBaseCursor("test", "test").run_load(
+ "test.test",
+ "test_schema.json",
+ ["test_data.json"],
+ schema_update_options=["THIS IS NOT VALID"]
+ )
+ self.assertIn("THIS IS NOT VALID", str(context.exception))
+
+ def test_invalid_schema_update_and_write_disposition(self):
+ with self.assertRaises(Exception) as context:
+ hook.BigQueryBaseCursor("test", "test").run_load(
+ "test.test",
+ "test_schema.json",
+ ["test_data.json"],
+ schema_update_options=['ALLOW_FIELD_ADDITION'],
+ write_disposition='WRITE_EMPTY'
+ )
+ self.assertIn("schema_update_options is only", str(context.exception))
+
if __name__ == '__main__':
unittest.main()