You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/07/05 23:30:48 UTC

incubator-airflow git commit: [AIRFLOW-314] Fix BigQuery cursor run_table_upsert method

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c08b02229 -> 2d7c83085


[AIRFLOW-314] Fix BigQuery cursor run_table_upsert method

Closes #1652 from mtagle/fix_bq_table_upsert

By default, bigquery will only return 50 tables when you ask for a list
of all the tables in a datatset. If you are trying to upsert a table
that exists, but you have more than 50 tables, the run_table_upsert
method may conclude that the table doesn't exist, and try to insert it,
and bigquery will error saying that the table does exist.

This fix checks if the response has pagination data, and looks at all
the pages, rather than just the first one, to see if the table exists.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d7c8308
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d7c8308
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d7c8308

Branch: refs/heads/master
Commit: 2d7c830858d3c220888d77120148c14c97f892df
Parents: c08b022
Author: Moira Tagle <mo...@wepay.com>
Authored: Tue Jul 5 16:29:27 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Jul 5 16:29:32 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py | 43 ++++++++++++++++-------------
 1 file changed, 24 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d7c8308/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 735fa15..0a33290 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -513,29 +513,34 @@ class BigQueryBaseCursor(object):
         """
         # check to see if the table exists
         table_id = table_resource['tableReference']['tableId']
-        table_exists = False
         project_id = project_id if project_id is not None else self.project_id
         tables_list_resp = self.service.tables().list(projectId=project_id,
                                                       datasetId=dataset_id).execute()
-        if 'tables' in tables_list_resp:
-            for table in tables_list_resp['tables']:
+        while True:
+            for table in tables_list_resp.get('tables', []):
                 if table['tableReference']['tableId'] == table_id:
-                    table_exists = True
-                    break
-
-        # do update if table exists
-        if table_exists:
-            logging.info('table %s:%s.%s exists, updating.', project_id, dataset_id, table_id)
-            return self.service.tables().update(projectId=project_id,
-                                                datasetId=dataset_id,
-                                                tableId=table_id,
-                                                body=table_resource).execute()
-        # do insert if table does not exist
-        else:
-            logging.info('table %s:%s.%s does not exist. creating.', project_id, dataset_id, table_id)
-            return self.service.tables().insert(projectId=project_id,
-                                                datasetId=dataset_id,
-                                                body=table_resource).execute()
+                    # found the table, do update
+                    logging.info('table %s:%s.%s exists, updating.',
+                                 project_id, dataset_id, table_id)
+                    return self.service.tables().update(projectId=project_id,
+                                                        datasetId=dataset_id,
+                                                        tableId=table_id,
+                                                        body=table_resource).execute()
+            # If there is a next page, we need to check the next page.
+            if 'nextPageToken' in tables_list_resp:
+                tables_list_resp = self.service.tables()\
+                    .list(projectId=project_id,
+                          datasetId=dataset_id,
+                          pageToken=tables_list_resp['nextPageToken'])\
+                    .execute()
+            # If there is no next page, then the table doesn't exist.
+            else:
+                # do insert
+                logging.info('table %s:%s.%s does not exist. creating.',
+                             project_id, dataset_id, table_id)
+                return self.service.tables().insert(projectId=project_id,
+                                                    datasetId=dataset_id,
+                                                    body=table_resource).execute()
 
     def run_grant_dataset_view_access(self,
                                       source_dataset,