You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/07/26 05:59:14 UTC

[1/2] beam git commit: We shouldn't write to re-created tables for 2 mins

Repository: beam
Updated Branches:
  refs/heads/master d919394c7 -> a9fdc3bc4


We shouldn't write to re-created tables for 2 mins


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/483abc09
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/483abc09
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/483abc09

Branch: refs/heads/master
Commit: 483abc0941f0fb42c506565f6912153296fd94b5
Parents: d919394
Author: Sourabh Bajaj <so...@google.com>
Authored: Mon Jul 24 15:54:02 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jul 25 22:58:23 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py      | 19 +++++++++++++++----
 sdks/python/apache_beam/io/gcp/bigquery_test.py |  3 ++-
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/483abc09/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 23fd310..db6715a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1002,12 +1002,23 @@ class BigQueryWrapper(object):
     if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
       return found_table
     else:
+      created_table = self._create_table(project_id=project_id,
+                                         dataset_id=dataset_id,
+                                         table_id=table_id,
+                                         schema=schema or found_table.schema)
       # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
       # the table before this point.
-      return self._create_table(project_id=project_id,
-                                dataset_id=dataset_id,
-                                table_id=table_id,
-                                schema=schema or found_table.schema)
+      if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
+        # BigQuery can route data to the old table for 2 mins max so wait
+        # that much time before creating the table and writing it
+        logging.warning('Sleeping for 150 seconds before the write as ' +
+                        'BigQuery inserts can be routed to deleted table ' +
+                        'for 2 mins after the delete and create.')
+        # TODO(BEAM-2673): Remove this sleep by migrating to load api
+        time.sleep(150)
+        return created_table
+      else:
+        return created_table
 
   def run_query(self, project_id, query, use_legacy_sql, flatten_results,
                 dry_run=False):

http://git-wip-us.apache.org/repos/asf/beam/blob/483abc09/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 14247ba..bfd06ac 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -650,7 +650,8 @@ class TestBigQueryWriter(unittest.TestCase):
     self.assertFalse(client.tables.Delete.called)
     self.assertFalse(client.tables.Insert.called)
 
-  def test_table_with_write_disposition_truncate(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_table_with_write_disposition_truncate(self, _patched_sleep):
     client = mock.Mock()
     table = bigquery.Table(
         tableReference=bigquery.TableReference(


[2/2] beam git commit: This closes #3630

Posted by ch...@apache.org.
This closes #3630


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9fdc3bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9fdc3bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9fdc3bc

Branch: refs/heads/master
Commit: a9fdc3bc4edf7d083efc163d4aff3f68d59c89b1
Parents: d919394 483abc0
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Jul 25 22:58:55 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jul 25 22:58:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py      | 19 +++++++++++++++----
 sdks/python/apache_beam/io/gcp/bigquery_test.py |  3 ++-
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------