You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/26 10:57:30 UTC
incubator-airflow git commit: [AIRFLOW-1943] Add External BigQuery
Table feature
Repository: incubator-airflow
Updated Branches:
refs/heads/master f9ddb36df -> e1bf38942
[AIRFLOW-1943] Add External BigQuery Table feature
Add ability to create a BigQuery External Table.
- Add new method create_external_table() in
BigQueryHook()
- Add parameters to existing
GoogleCloudStorageToBigQueryOperator()
Closes #2948 from kaxil/external_table
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e1bf3894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e1bf3894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e1bf3894
Branch: refs/heads/master
Commit: e1bf38942f3fa2fd41f6c4f6824a02b01e92ae38
Parents: f9ddb36
Author: Kaxil Naik <ka...@gmail.com>
Authored: Fri Jan 26 11:57:12 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 26 11:57:12 2018 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/bigquery_hook.py | 188 +++++++++++++++++++++++++
airflow/contrib/operators/gcs_to_bq.py | 64 ++++++---
tests/contrib/hooks/test_bigquery_hook.py | 30 +++-
3 files changed, 262 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 4ab4ac0..cd6bf32 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -207,6 +207,194 @@ class BigQueryBaseCursor(LoggingMixin):
self.use_legacy_sql = use_legacy_sql
self.running_job_id = None
+ def create_external_table(self,
+ external_project_dataset_table,
+ schema_fields,
+ source_uris,
+ source_format='CSV',
+ autodetect=False,
+ compression='NONE',
+ ignore_unknown_values=False,
+ max_bad_records=0,
+ skip_leading_rows=0,
+ field_delimiter=',',
+ quote_character=None,
+ allow_quoted_newlines=False,
+ allow_jagged_rows=False,
+ src_fmt_configs={}
+ ):
+ """
+ Creates a new external table in the dataset with the data in Google
+ Cloud Storage. See here:
+
+ https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
+
+ for more details about these parameters.
+
+ :param external_project_dataset_table:
+ The dotted (<project>.|<project>:)<dataset>.<table>($<partition>) BigQuery
+ table name to create external table.
+ If <project> is not included, project will be the
+ project defined in the connection json.
+ :type external_project_dataset_table: string
+ :param schema_fields: The schema field list as defined here:
+ https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
+ :type schema_fields: list
+ :param source_uris: The source Google Cloud
+ Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
+ per-object name can be used.
+ :type source_uris: list
+ :param source_format: File format to export.
+ :type source_format: string
+ :param autodetect: Try to detect schema and format options automatically.
+ Any option specified explicitly will be honored.
+ :type autodetect: bool
+ :param compression: [Optional] The compression type of the data source.
+ Possible values include GZIP and NONE.
+ The default value is NONE.
+ This setting is ignored for Google Cloud Bigtable,
+ Google Cloud Datastore backups and Avro formats.
+ :type compression: string
+ :param ignore_unknown_values: [Optional] Indicates if BigQuery should allow
+ extra values that are not represented in the table schema.
+ If true, the extra values are ignored. If false, records with extra columns
+ are treated as bad records, and if there are too many bad records, an
+ invalid error is returned in the job result.
+ :type ignore_unknown_values: bool
+ :param max_bad_records: The maximum number of bad records that BigQuery can
+ ignore when running the job.
+ :type max_bad_records: int
+ :param skip_leading_rows: Number of rows to skip when loading from a CSV.
+ :type skip_leading_rows: int
+ :param field_delimiter: The delimiter to use when loading from a CSV.
+ :type field_delimiter: string
+ :param quote_character: The value that is used to quote data sections in a CSV
+ file.
+ :type quote_character: string
+ :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not
+ (false).
+ :type allow_quoted_newlines: boolean
+ :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
+ The missing values are treated as nulls. If false, records with missing
+ trailing columns are treated as bad records, and if there are too many bad
+ records, an invalid error is returned in the job result. Only applicable when
+ soure_format is CSV.
+ :type allow_jagged_rows: bool
+ :param src_fmt_configs: configure optional fields specific to the source format
+ :type src_fmt_configs: dict
+ """
+
+ project_id, dataset_id, external_table_id = \
+ _split_tablename(table_input=external_project_dataset_table,
+ default_project_id=self.project_id,
+ var_name='external_project_dataset_table')
+
+ # bigquery only allows certain source formats
+ # we check to make sure the passed source format is valid
+ # if it's not, we raise a ValueError
+ # Refer to this link for more details:
+ # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat
+
+ source_format = source_format.upper()
+ allowed_formats = [
+ "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS",
+ "DATASTORE_BACKUP"
+ ]
+ if source_format not in allowed_formats:
+ raise ValueError("{0} is not a valid source format. "
+ "Please use one of the following types: {1}"
+ .format(source_format, allowed_formats))
+
+ compression = compression.upper()
+ allowed_compressions = ['NONE', 'GZIP']
+ if compression not in allowed_compressions:
+ raise ValueError("{0} is not a valid compression format. "
+ "Please use one of the following types: {1}"
+ .format(compression, allowed_compressions))
+
+ table_resource = {
+ 'externalDataConfiguration': {
+ 'autodetect': autodetect,
+ 'sourceFormat': source_format,
+ 'sourceUris': source_uris,
+ 'compression': compression,
+ 'ignoreUnknownValues': ignore_unknown_values
+ },
+ 'tableReference': {
+ 'projectId': project_id,
+ 'datasetId': dataset_id,
+ 'tableId': external_table_id,
+ }
+ }
+
+ if schema_fields:
+ table_resource['externalDataConfiguration'].update({
+ 'schema': {
+ 'fields': schema_fields
+ }
+ })
+
+ self.log.info('Creating external table: %s', external_project_dataset_table)
+
+ if max_bad_records:
+ table_resource['externalDataConfiguration']['maxBadRecords'] = max_bad_records
+
+ # if following fields are not specified in src_fmt_configs,
+ # honor the top-level params for backward-compatibility
+ if 'skipLeadingRows' not in src_fmt_configs:
+ src_fmt_configs['skipLeadingRows'] = skip_leading_rows
+ if 'fieldDelimiter' not in src_fmt_configs:
+ src_fmt_configs['fieldDelimiter'] = field_delimiter
+ if 'quote_character' not in src_fmt_configs:
+ src_fmt_configs['quote'] = quote_character
+ if 'allowQuotedNewlines' not in src_fmt_configs:
+ src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines
+ if 'allowJaggedRows' not in src_fmt_configs:
+ src_fmt_configs['allowJaggedRows'] = allow_jagged_rows
+
+ src_fmt_to_param_mapping = {
+ 'CSV': 'csvOptions',
+ 'GOOGLE_SHEETS': 'googleSheetsOptions'
+ }
+
+ src_fmt_to_configs_mapping = {
+ 'csvOptions': [
+ 'allowJaggedRows', 'allowQuotedNewlines',
+ 'fieldDelimiter', 'skipLeadingRows',
+ 'quote'
+ ],
+ 'googleSheetsOptions': ['skipLeadingRows']
+ }
+
+ if source_format in src_fmt_to_param_mapping.keys():
+
+ valid_configs = src_fmt_to_configs_mapping[
+ src_fmt_to_param_mapping[source_format]
+ ]
+
+ src_fmt_configs = {
+ k: v
+ for k, v in src_fmt_configs.items() if k in valid_configs
+ }
+
+ table_resource['externalDataConfiguration'][src_fmt_to_param_mapping[
+ source_format]] = src_fmt_configs
+
+ try:
+ self.service.tables().insert(
+ projectId=project_id,
+ datasetId=dataset_id,
+ body=table_resource
+ ).execute()
+
+ self.log.info('External table created successfully: %s',
+ external_project_dataset_table)
+
+ except HttpError as err:
+ raise Exception(
+ 'BigQuery job failed. Error was: {}'.format(err.content)
+ )
+
def run_query(self,
bql,
destination_dataset_table=False,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 7625bbe..f68da7f 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -47,6 +47,12 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
:param schema_object: string
:param source_format: File format to export.
:type source_format: string
+ :param compression: [Optional] The compression type of the data source.
+ Possible values include GZIP and NONE.
+ The default value is NONE.
+ This setting is ignored for Google Cloud Bigtable,
+ Google Cloud Datastore backups and Avro formats.
+ :type compression: string
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
@@ -84,11 +90,14 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
- :param schema_update_options: Allows the schema of the desitination
+ :param schema_update_options: Allows the schema of the destination
table to be updated as a side effect of the load job.
:type schema_update_options: list
:param src_fmt_configs: configure optional fields specific to the source format
:type src_fmt_configs: dict
+ :param external_table: Flag to specify if the destination table should be
+ a BigQuery external table. Default Value is False.
+ :type external_table: bool
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
Note that 'field' is not available in concurrency with
@@ -108,6 +117,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
schema_fields=None,
schema_object=None,
source_format='CSV',
+ compression='NONE',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
@@ -122,6 +132,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
delegate_to=None,
schema_update_options=(),
src_fmt_configs={},
+ external_table=False,
time_partitioning={},
*args, **kwargs):
@@ -136,6 +147,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
self.destination_project_dataset_table = destination_project_dataset_table
self.schema_fields = schema_fields
self.source_format = source_format
+ self.compression = compression
self.create_disposition = create_disposition
self.skip_leading_rows = skip_leading_rows
self.write_disposition = write_disposition
@@ -144,6 +156,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
self.quote_character = quote_character
self.allow_quoted_newlines = allow_quoted_newlines
self.allow_jagged_rows = allow_jagged_rows
+ self.external_table = external_table
self.max_id_key = max_id_key
self.bigquery_conn_id = bigquery_conn_id
@@ -173,22 +186,39 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
for source_object in self.source_objects]
conn = bq_hook.get_conn()
cursor = conn.cursor()
- cursor.run_load(
- destination_project_dataset_table=self.destination_project_dataset_table,
- schema_fields=schema_fields,
- source_uris=source_uris,
- source_format=self.source_format,
- create_disposition=self.create_disposition,
- skip_leading_rows=self.skip_leading_rows,
- write_disposition=self.write_disposition,
- field_delimiter=self.field_delimiter,
- max_bad_records=self.max_bad_records,
- quote_character=self.quote_character,
- allow_quoted_newlines=self.allow_quoted_newlines,
- allow_jagged_rows=self.allow_jagged_rows,
- schema_update_options=self.schema_update_options,
- src_fmt_configs=self.src_fmt_configs,
- time_partitioning=self.time_partitioning)
+
+ if self.external_table:
+ cursor.create_external_table(
+ external_project_dataset_table=self.destination_project_dataset_table,
+ schema_fields=schema_fields,
+ source_uris=source_uris,
+ source_format=self.source_format,
+ compression=self.compression,
+ skip_leading_rows=self.skip_leading_rows,
+ field_delimiter=self.field_delimiter,
+ max_bad_records=self.max_bad_records,
+ quote_character=self.quote_character,
+ allow_quoted_newlines=self.allow_quoted_newlines,
+ allow_jagged_rows=self.allow_jagged_rows,
+ src_fmt_configs=self.src_fmt_configs
+ )
+ else:
+ cursor.run_load(
+ destination_project_dataset_table=self.destination_project_dataset_table,
+ schema_fields=schema_fields,
+ source_uris=source_uris,
+ source_format=self.source_format,
+ create_disposition=self.create_disposition,
+ skip_leading_rows=self.skip_leading_rows,
+ write_disposition=self.write_disposition,
+ field_delimiter=self.field_delimiter,
+ max_bad_records=self.max_bad_records,
+ quote_character=self.quote_character,
+ allow_quoted_newlines=self.allow_quoted_newlines,
+ allow_jagged_rows=self.allow_jagged_rows,
+ schema_update_options=self.schema_update_options,
+ src_fmt_configs=self.src_fmt_configs,
+ time_partitioning=self.time_partitioning)
if self.max_id_key:
cursor.execute('SELECT MAX({}) FROM {}'.format(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/tests/contrib/hooks/test_bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py
index 4c0eefa..a5dd595 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -157,19 +157,43 @@ class TestBigQueryTableSplitter(unittest.TestCase):
self.assertIn('Format exception for var_x:',
str(context.exception), "")
+
class TestBigQueryHookSourceFormat(unittest.TestCase):
def test_invalid_source_format(self):
with self.assertRaises(Exception) as context:
- hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json")
+ hook.BigQueryBaseCursor("test", "test").run_load(
+ "test.test", "test_schema.json", ["test_data.json"], source_format="json"
+ )
+
+ # since we passed 'json' in, and it's not valid, make sure it's present in the
+ # error string.
+ self.assertIn("JSON", str(context.exception))
+
+
+class TestBigQueryExternalTableSourceFormat(unittest.TestCase):
+ def test_invalid_source_format(self):
+ with self.assertRaises(Exception) as context:
+ hook.BigQueryBaseCursor("test", "test").create_external_table(
+ external_project_dataset_table='test.test',
+ schema_fields='test_schema.json',
+ source_uris=['test_data.json'],
+ source_format='json'
+ )
- # since we passed 'json' in, and it's not valid, make sure it's present in the error string.
+ # since we passed 'csv' in, and it's not valid, make sure it's present in the
+ # error string.
self.assertIn("JSON", str(context.exception))
-# Helpers to test_cancel_queries that have mock_poll_job_complete returning false, unless mock_job_cancel was called with the same job_id
+
+# Helpers to test_cancel_queries that have mock_poll_job_complete returning false,
+# unless mock_job_cancel was called with the same job_id
mock_canceled_jobs = []
+
+
def mock_poll_job_complete(job_id):
return job_id in mock_canceled_jobs
+
def mock_job_cancel(projectId, jobId):
mock_canceled_jobs.append(jobId)
return mock.Mock()