You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/09/08 18:37:55 UTC

incubator-airflow git commit: [AIRFLOW-1579] Adds support for jagged rows in Bigquery hook for BQ load jobs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c2c51518e -> 5b978b28b


[AIRFLOW-1579] Adds support for jagged rows in Bigquery hook for BQ load jobs

Closes #2582 from DannyLee12/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5b978b28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5b978b28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5b978b28

Branch: refs/heads/master
Commit: 5b978b28bcb0427cc1beac08302b7d3b4020f072
Parents: c2c5151
Author: Daniel Lee <da...@dotmodus.com>
Authored: Fri Sep 8 11:37:35 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Sep 8 11:37:42 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py |  9 +++++++++
 airflow/contrib/operators/gcs_to_bq.py | 10 +++++++++-
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5b978b28/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index dd22671..d2ce2b0 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -390,6 +390,7 @@ class BigQueryBaseCursor(object):
                  max_bad_records=0,
                  quote_character=None,
                  allow_quoted_newlines=False,
+                 allow_jagged_rows=False,
                  schema_update_options=(),
                  src_fmt_configs={}):
         """
@@ -429,6 +430,11 @@ class BigQueryBaseCursor(object):
         :type quote_character: string
         :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
         :type allow_quoted_newlines: boolean
+        :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
+            The missing values are treated as nulls. If false, records with missing trailing columns
+            are treated as bad records, and if there are too many bad records, an invalid error is
+            returned in the job result. Only applicable when soure_format is CSV.
+        :type allow_jagged_rows: bool
         :param schema_update_options: Allows the schema of the desitination
             table to be updated as a side effect of the load job.
         :type schema_update_options: list
@@ -527,6 +533,9 @@ class BigQueryBaseCursor(object):
                            if k in valid_configs}
         configuration['load'].update(src_fmt_configs)
 
+        if allow_jagged_rows:
+            configuration['load']['allowJaggedRows'] = allow_jagged_rows
+
         return self.run_with_configuration(configuration)
 
     def run_with_configuration(self, configuration):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5b978b28/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 9981cd4..39f0a48 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -46,6 +46,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         max_bad_records=0,
         quote_character=None,
         allow_quoted_newlines=False,
+        allow_jagged_rows=False,
         max_id_key=None,
         bigquery_conn_id='bigquery_default',
         google_cloud_storage_conn_id='google_cloud_storage_default',
@@ -93,6 +94,11 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         :type quote_character: string
         :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
         :type allow_quoted_newlines: boolean
+        :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
+            The missing values are treated as nulls. If false, records with missing trailing columns
+            are treated as bad records, and if there are too many bad records, an invalid error is
+            returned in the job result. Only applicable to CSV, ignored for other formats.
+        :type allow_jagged_rows: bool
         :param max_id_key: If set, the name of a column in the BigQuery table
             that's to be loaded. Thsi will be used to select the MAX value from
             BigQuery after the load occurs. The results will be returned by the
@@ -109,7 +115,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
             work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
-        :param schema_update_options: Allows the schema of the desitination 
+        :param schema_update_options: Allows the schema of the desitination
             table to be updated as a side effect of the load job.
         :type schema_update_options: list
         :param src_fmt_configs: configure optional fields specific to the source format
@@ -133,6 +139,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         self.max_bad_records = max_bad_records
         self.quote_character = quote_character
         self.allow_quoted_newlines = allow_quoted_newlines
+        self.allow_jagged_rows = allow_jagged_rows
 
         self.max_id_key = max_id_key
         self.bigquery_conn_id = bigquery_conn_id
@@ -173,6 +180,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
             max_bad_records=self.max_bad_records,
             quote_character=self.quote_character,
             allow_quoted_newlines=self.allow_quoted_newlines,
+            allow_jagged_rows=self.allow_jagged_rows,
             schema_update_options=self.schema_update_options,
             src_fmt_configs=self.src_fmt_configs)