You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/26 10:57:30 UTC

incubator-airflow git commit: [AIRFLOW-1943] Add External BigQuery Table feature

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f9ddb36df -> e1bf38942


[AIRFLOW-1943] Add External BigQuery Table feature

Add ability to create a BigQuery External Table.
- Add new method create_external_table() in
BigQueryHook()
- Add parameters to existing
GoogleCloudStorageToBigQueryOperator()

Closes #2948 from kaxil/external_table


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e1bf3894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e1bf3894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e1bf3894

Branch: refs/heads/master
Commit: e1bf38942f3fa2fd41f6c4f6824a02b01e92ae38
Parents: f9ddb36
Author: Kaxil Naik <ka...@gmail.com>
Authored: Fri Jan 26 11:57:12 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 26 11:57:12 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py    | 188 +++++++++++++++++++++++++
 airflow/contrib/operators/gcs_to_bq.py    |  64 ++++++---
 tests/contrib/hooks/test_bigquery_hook.py |  30 +++-
 3 files changed, 262 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 4ab4ac0..cd6bf32 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -207,6 +207,194 @@ class BigQueryBaseCursor(LoggingMixin):
         self.use_legacy_sql = use_legacy_sql
         self.running_job_id = None
 
+    def create_external_table(self,
+                              external_project_dataset_table,
+                              schema_fields,
+                              source_uris,
+                              source_format='CSV',
+                              autodetect=False,
+                              compression='NONE',
+                              ignore_unknown_values=False,
+                              max_bad_records=0,
+                              skip_leading_rows=0,
+                              field_delimiter=',',
+                              quote_character=None,
+                              allow_quoted_newlines=False,
+                              allow_jagged_rows=False,
+                              src_fmt_configs={}
+                              ):
+        """
+        Creates a new external table in the dataset with the data in Google
+        Cloud Storage. See here:
+
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
+
+        for more details about these parameters.
+
+        :param external_project_dataset_table:
+            The dotted (<project>.|<project>:)<dataset>.<table>($<partition>) BigQuery
+            table name to create external table.
+            If <project> is not included, project will be the
+            project defined in the connection json.
+        :type external_project_dataset_table: string
+        :param schema_fields: The schema field list as defined here:
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
+        :type schema_fields: list
+        :param source_uris: The source Google Cloud
+            Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
+            per-object name can be used.
+        :type source_uris: list
+        :param source_format: File format to export.
+        :type source_format: string
+        :param autodetect: Try to detect schema and format options automatically.
+            Any option specified explicitly will be honored.
+        :type autodetect: bool
+        :param compression: [Optional] The compression type of the data source.
+            Possible values include GZIP and NONE.
+            The default value is NONE.
+            This setting is ignored for Google Cloud Bigtable,
+                Google Cloud Datastore backups and Avro formats.
+        :type compression: string
+        :param ignore_unknown_values: [Optional] Indicates if BigQuery should allow
+            extra values that are not represented in the table schema.
+            If true, the extra values are ignored. If false, records with extra columns
+            are treated as bad records, and if there are too many bad records, an
+            invalid error is returned in the job result.
+        :type ignore_unknown_values: bool
+        :param max_bad_records: The maximum number of bad records that BigQuery can
+            ignore when running the job.
+        :type max_bad_records: int
+        :param skip_leading_rows: Number of rows to skip when loading from a CSV.
+        :type skip_leading_rows: int
+        :param field_delimiter: The delimiter to use when loading from a CSV.
+        :type field_delimiter: string
+        :param quote_character: The value that is used to quote data sections in a CSV
+            file.
+        :type quote_character: string
+        :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not
+            (false).
+        :type allow_quoted_newlines: boolean
+        :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
+            The missing values are treated as nulls. If false, records with missing
+            trailing columns are treated as bad records, and if there are too many bad
+            records, an invalid error is returned in the job result. Only applicable when
+            soure_format is CSV.
+        :type allow_jagged_rows: bool
+        :param src_fmt_configs: configure optional fields specific to the source format
+        :type src_fmt_configs: dict
+        """
+
+        project_id, dataset_id, external_table_id = \
+            _split_tablename(table_input=external_project_dataset_table,
+                             default_project_id=self.project_id,
+                             var_name='external_project_dataset_table')
+
+        # bigquery only allows certain source formats
+        # we check to make sure the passed source format is valid
+        # if it's not, we raise a ValueError
+        # Refer to this link for more details:
+        #   https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat
+
+        source_format = source_format.upper()
+        allowed_formats = [
+            "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS",
+            "DATASTORE_BACKUP"
+        ]
+        if source_format not in allowed_formats:
+            raise ValueError("{0} is not a valid source format. "
+                             "Please use one of the following types: {1}"
+                             .format(source_format, allowed_formats))
+
+        compression = compression.upper()
+        allowed_compressions = ['NONE', 'GZIP']
+        if compression not in allowed_compressions:
+            raise ValueError("{0} is not a valid compression format. "
+                             "Please use one of the following types: {1}"
+                             .format(compression, allowed_compressions))
+
+        table_resource = {
+            'externalDataConfiguration': {
+                'autodetect': autodetect,
+                'sourceFormat': source_format,
+                'sourceUris': source_uris,
+                'compression': compression,
+                'ignoreUnknownValues': ignore_unknown_values
+            },
+            'tableReference': {
+                'projectId': project_id,
+                'datasetId': dataset_id,
+                'tableId': external_table_id,
+            }
+        }
+
+        if schema_fields:
+            table_resource['externalDataConfiguration'].update({
+                'schema': {
+                    'fields': schema_fields
+                }
+            })
+
+        self.log.info('Creating external table: %s', external_project_dataset_table)
+
+        if max_bad_records:
+            table_resource['externalDataConfiguration']['maxBadRecords'] = max_bad_records
+
+        # if following fields are not specified in src_fmt_configs,
+        # honor the top-level params for backward-compatibility
+        if 'skipLeadingRows' not in src_fmt_configs:
+            src_fmt_configs['skipLeadingRows'] = skip_leading_rows
+        if 'fieldDelimiter' not in src_fmt_configs:
+            src_fmt_configs['fieldDelimiter'] = field_delimiter
+        if 'quote_character' not in src_fmt_configs:
+            src_fmt_configs['quote'] = quote_character
+        if 'allowQuotedNewlines' not in src_fmt_configs:
+            src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines
+        if 'allowJaggedRows' not in src_fmt_configs:
+            src_fmt_configs['allowJaggedRows'] = allow_jagged_rows
+
+        src_fmt_to_param_mapping = {
+            'CSV': 'csvOptions',
+            'GOOGLE_SHEETS': 'googleSheetsOptions'
+        }
+
+        src_fmt_to_configs_mapping = {
+            'csvOptions': [
+                'allowJaggedRows', 'allowQuotedNewlines',
+                'fieldDelimiter', 'skipLeadingRows',
+                'quote'
+            ],
+            'googleSheetsOptions': ['skipLeadingRows']
+        }
+
+        if source_format in src_fmt_to_param_mapping.keys():
+
+            valid_configs = src_fmt_to_configs_mapping[
+                src_fmt_to_param_mapping[source_format]
+            ]
+
+            src_fmt_configs = {
+                k: v
+                for k, v in src_fmt_configs.items() if k in valid_configs
+            }
+
+            table_resource['externalDataConfiguration'][src_fmt_to_param_mapping[
+                source_format]] = src_fmt_configs
+
+        try:
+            self.service.tables().insert(
+                projectId=project_id,
+                datasetId=dataset_id,
+                body=table_resource
+            ).execute()
+
+            self.log.info('External table created successfully: %s',
+                          external_project_dataset_table)
+
+        except HttpError as err:
+            raise Exception(
+                'BigQuery job failed. Error was: {}'.format(err.content)
+            )
+
     def run_query(self,
                   bql,
                   destination_dataset_table=False,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 7625bbe..f68da7f 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -47,6 +47,12 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
     :param schema_object: string
     :param source_format: File format to export.
     :type source_format: string
+    :param compression: [Optional] The compression type of the data source.
+            Possible values include GZIP and NONE.
+            The default value is NONE.
+            This setting is ignored for Google Cloud Bigtable,
+                Google Cloud Datastore backups and Avro formats.
+    :type compression: string
     :param create_disposition: The create disposition if the table doesn't exist.
     :type create_disposition: string
     :param skip_leading_rows: Number of rows to skip when loading from a CSV.
@@ -84,11 +90,14 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         work, the service account making the request must have domain-wide
         delegation enabled.
     :type delegate_to: string
-    :param schema_update_options: Allows the schema of the desitination
+    :param schema_update_options: Allows the schema of the destination
         table to be updated as a side effect of the load job.
     :type schema_update_options: list
     :param src_fmt_configs: configure optional fields specific to the source format
     :type src_fmt_configs: dict
+    :param external_table: Flag to specify if the destination table should be
+        a BigQuery external table. Default Value is False.
+    :type external_table: bool
     :param time_partitioning: configure optional time partitioning fields i.e.
         partition by field, type and  expiration as per API specifications.
         Note that 'field' is not available in concurrency with
@@ -108,6 +117,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
                  schema_fields=None,
                  schema_object=None,
                  source_format='CSV',
+                 compression='NONE',
                  create_disposition='CREATE_IF_NEEDED',
                  skip_leading_rows=0,
                  write_disposition='WRITE_EMPTY',
@@ -122,6 +132,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
                  delegate_to=None,
                  schema_update_options=(),
                  src_fmt_configs={},
+                 external_table=False,
                  time_partitioning={},
                  *args, **kwargs):
 
@@ -136,6 +147,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         self.destination_project_dataset_table = destination_project_dataset_table
         self.schema_fields = schema_fields
         self.source_format = source_format
+        self.compression = compression
         self.create_disposition = create_disposition
         self.skip_leading_rows = skip_leading_rows
         self.write_disposition = write_disposition
@@ -144,6 +156,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         self.quote_character = quote_character
         self.allow_quoted_newlines = allow_quoted_newlines
         self.allow_jagged_rows = allow_jagged_rows
+        self.external_table = external_table
 
         self.max_id_key = max_id_key
         self.bigquery_conn_id = bigquery_conn_id
@@ -173,22 +186,39 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
                        for source_object in self.source_objects]
         conn = bq_hook.get_conn()
         cursor = conn.cursor()
-        cursor.run_load(
-            destination_project_dataset_table=self.destination_project_dataset_table,
-            schema_fields=schema_fields,
-            source_uris=source_uris,
-            source_format=self.source_format,
-            create_disposition=self.create_disposition,
-            skip_leading_rows=self.skip_leading_rows,
-            write_disposition=self.write_disposition,
-            field_delimiter=self.field_delimiter,
-            max_bad_records=self.max_bad_records,
-            quote_character=self.quote_character,
-            allow_quoted_newlines=self.allow_quoted_newlines,
-            allow_jagged_rows=self.allow_jagged_rows,
-            schema_update_options=self.schema_update_options,
-            src_fmt_configs=self.src_fmt_configs,
-            time_partitioning=self.time_partitioning)
+
+        if self.external_table:
+            cursor.create_external_table(
+                external_project_dataset_table=self.destination_project_dataset_table,
+                schema_fields=schema_fields,
+                source_uris=source_uris,
+                source_format=self.source_format,
+                compression=self.compression,
+                skip_leading_rows=self.skip_leading_rows,
+                field_delimiter=self.field_delimiter,
+                max_bad_records=self.max_bad_records,
+                quote_character=self.quote_character,
+                allow_quoted_newlines=self.allow_quoted_newlines,
+                allow_jagged_rows=self.allow_jagged_rows,
+                src_fmt_configs=self.src_fmt_configs
+            )
+        else:
+            cursor.run_load(
+                destination_project_dataset_table=self.destination_project_dataset_table,
+                schema_fields=schema_fields,
+                source_uris=source_uris,
+                source_format=self.source_format,
+                create_disposition=self.create_disposition,
+                skip_leading_rows=self.skip_leading_rows,
+                write_disposition=self.write_disposition,
+                field_delimiter=self.field_delimiter,
+                max_bad_records=self.max_bad_records,
+                quote_character=self.quote_character,
+                allow_quoted_newlines=self.allow_quoted_newlines,
+                allow_jagged_rows=self.allow_jagged_rows,
+                schema_update_options=self.schema_update_options,
+                src_fmt_configs=self.src_fmt_configs,
+                time_partitioning=self.time_partitioning)
 
         if self.max_id_key:
             cursor.execute('SELECT MAX({}) FROM {}'.format(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/tests/contrib/hooks/test_bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py
index 4c0eefa..a5dd595 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -157,19 +157,43 @@ class TestBigQueryTableSplitter(unittest.TestCase):
         self.assertIn('Format exception for var_x:',
                       str(context.exception), "")
 
+
 class TestBigQueryHookSourceFormat(unittest.TestCase):
     def test_invalid_source_format(self):
         with self.assertRaises(Exception) as context:
-            hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json")
+            hook.BigQueryBaseCursor("test", "test").run_load(
+                "test.test", "test_schema.json", ["test_data.json"], source_format="json"
+            )
+
+        # since we passed 'json' in, and it's not valid, make sure it's present in the
+        # error string.
+        self.assertIn("JSON", str(context.exception))
+
+
+class TestBigQueryExternalTableSourceFormat(unittest.TestCase):
+    def test_invalid_source_format(self):
+        with self.assertRaises(Exception) as context:
+            hook.BigQueryBaseCursor("test", "test").create_external_table(
+                external_project_dataset_table='test.test',
+                schema_fields='test_schema.json',
+                source_uris=['test_data.json'],
+                source_format='json'
+            )
 
-        # since we passed 'json' in, and it's not valid, make sure it's present in the error string.
+        # since we passed 'csv' in, and it's not valid, make sure it's present in the
+        # error string.
         self.assertIn("JSON", str(context.exception))
 
-# Helpers to test_cancel_queries that have mock_poll_job_complete returning false, unless mock_job_cancel was called with the same job_id
+
+# Helpers to test_cancel_queries that have mock_poll_job_complete returning false,
+# unless mock_job_cancel was called with the same job_id
 mock_canceled_jobs = []
+
+
 def mock_poll_job_complete(job_id):
     return job_id in mock_canceled_jobs
 
+
 def mock_job_cancel(projectId, jobId):
     mock_canceled_jobs.append(jobId)
     return mock.Mock()