You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/11 19:28:44 UTC

[GitHub] kaxil closed pull request #3880: [AIRFLOW-461] Support autodetected schemas in BigQuery run_load

kaxil closed pull request #3880: [AIRFLOW-461]  Support autodetected schemas in BigQuery run_load
URL: https://github.com/apache/incubator-airflow/pull/3880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 7a1631b53a..a4d91769c6 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -851,8 +851,8 @@ def run_copy(self,
 
     def run_load(self,
                  destination_project_dataset_table,
-                 schema_fields,
                  source_uris,
+                 schema_fields=None,
                  source_format='CSV',
                  create_disposition='CREATE_IF_NEEDED',
                  skip_leading_rows=0,
@@ -866,7 +866,8 @@ def run_load(self,
                  schema_update_options=(),
                  src_fmt_configs=None,
                  time_partitioning=None,
-                 cluster_fields=None):
+                 cluster_fields=None,
+                 autodetect=False):
         """
         Executes a BigQuery load command to load data from Google Cloud Storage
         to BigQuery. See here:
@@ -884,7 +885,11 @@ def run_load(self,
         :type destination_project_dataset_table: str
         :param schema_fields: The schema field list as defined here:
             https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
+            Required if autodetect=False; optional if autodetect=True.
         :type schema_fields: list
+        :param autodetect: Attempt to autodetect the schema for CSV and JSON
+            source files.
+        :type autodetect: bool
         :param source_uris: The source Google Cloud
             Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
             per-object name can be used.
@@ -941,6 +946,11 @@ def run_load(self,
         # if it's not, we raise a ValueError
         # Refer to this link for more details:
         #   https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
+
+        if schema_fields is None and not autodetect:
+            raise ValueError(
+                'You must either pass a schema or autodetect=True.')
+
         if src_fmt_configs is None:
             src_fmt_configs = {}
 
@@ -975,6 +985,7 @@ def run_load(self,
 
         configuration = {
             'load': {
+                'autodetect': autodetect,
                 'createDisposition': create_disposition,
                 'destinationTable': {
                     'projectId': destination_project,
@@ -1592,7 +1603,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):
 
     if '.' not in table_input:
         raise ValueError(
-            'Expected deletion_dataset_table name in the format of '
+            'Expected target table name in the format of '
             '<dataset>.<table>. Got: {}'.format(table_input))
 
     if not default_project_id:
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index fec877db05..caed3befed 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -308,7 +308,7 @@ def __init__(self,
                  project_id=None,
                  schema_fields=None,
                  gcs_schema_object=None,
-                 time_partitioning={},
+                 time_partitioning=None,
                  bigquery_conn_id='bigquery_default',
                  google_cloud_storage_conn_id='google_cloud_default',
                  delegate_to=None,
@@ -325,7 +325,7 @@ def __init__(self,
         self.bigquery_conn_id = bigquery_conn_id
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.delegate_to = delegate_to
-        self.time_partitioning = time_partitioning
+        self.time_partitioning = {} if time_partitioning is None else time_partitioning
         self.labels = labels
 
     def execute(self, context):
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 39dff21606..a98e15a8d6 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -152,6 +152,7 @@ def __init__(self,
                  external_table=False,
                  time_partitioning=None,
                  cluster_fields=None,
+                 autodetect=False,
                  *args, **kwargs):
 
         super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
@@ -190,20 +191,24 @@ def __init__(self,
         self.src_fmt_configs = src_fmt_configs
         self.time_partitioning = time_partitioning
         self.cluster_fields = cluster_fields
+        self.autodetect = autodetect
 
     def execute(self, context):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                                delegate_to=self.delegate_to)
 
-        if not self.schema_fields and \
-                self.schema_object and \
-                self.source_format != 'DATASTORE_BACKUP':
-            gcs_hook = GoogleCloudStorageHook(
-                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
-                delegate_to=self.delegate_to)
-            schema_fields = json.loads(gcs_hook.download(
-                self.bucket,
-                self.schema_object).decode("utf-8"))
+        if not self.schema_fields:
+            if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
+                gcs_hook = GoogleCloudStorageHook(
+                    google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+                    delegate_to=self.delegate_to)
+                schema_fields = json.loads(gcs_hook.download(
+                    self.bucket,
+                    self.schema_object).decode("utf-8"))
+            elif self.schema_object is None and self.autodetect is False:
+                raise ValueError('At least one of `schema_fields`, `schema_object`, '
+                                 'or `autodetect` must be passed.')
+
         else:
             schema_fields = self.schema_fields
 
@@ -234,6 +239,7 @@ def execute(self, context):
                 schema_fields=schema_fields,
                 source_uris=source_uris,
                 source_format=self.source_format,
+                autodetect=self.autodetect,
                 create_disposition=self.create_disposition,
                 skip_leading_rows=self.skip_leading_rows,
                 write_disposition=self.write_disposition,
diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py
index e1379dde79..9a46212fb3 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -358,6 +358,14 @@ def run_with_config(config):
 
         mocked_rwc.assert_called_once()
 
+    @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
+    def test_run_with_auto_detect(self, run_with_config):
+        destination_project_dataset_table = "autodetect.table"
+        cursor = hook.BigQueryBaseCursor(mock.Mock(), "project_id")
+        cursor.run_load(destination_project_dataset_table, [], [], autodetect=True)
+        args, kwargs = run_with_config.call_args
+        self.assertIs(args[0]['load']['autodetect'], True)
+
     @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
     @mock.patch("airflow.contrib.hooks.bigquery_hook.time")
     @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services