You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/24 20:27:43 UTC

[3/4] beam git commit: [BEAM-1258] Improve logging in BigQueryIO.verifyTableEmpty().

[BEAM-1258] Improve logging in BigQueryIO.verifyTableEmpty().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b6dd91d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b6dd91d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b6dd91d

Branch: refs/heads/master
Commit: 5b6dd91d27ce73fa66db4d445b0ceb88f09971d8
Parents: cb6e0a8
Author: Pei He <pe...@google.com>
Authored: Mon Jan 23 14:52:30 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 24 12:25:22 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 32 +++++++++++---------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  2 ++
 2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5b6dd91d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 701374d..aff199a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1863,25 +1863,27 @@ public class BigQueryIO {
             writeDisposition, validate, testServices);
       }
 
-      private static void verifyTableEmpty(
+      private static void verifyTableNotExistOrEmpty(
           DatasetService datasetService,
-          TableReference table) {
+          TableReference tableRef) {
         try {
-          boolean isEmpty = datasetService.isTableEmpty(
-              table.getProjectId(), table.getDatasetId(), table.getTableId());
-          if (!isEmpty) {
-            throw new IllegalArgumentException(
-                "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
+          if (datasetService.getTable(
+              tableRef.getProjectId(),
+              tableRef.getDatasetId(),
+              tableRef.getTableId()) != null) {
+            checkState(
+                datasetService.isTableEmpty(
+                    tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
+                "BigQuery table is not empty: %s.",
+                BigQueryIO.toTableSpec(tableRef));
           }
         } catch (IOException | InterruptedException e) {
-          ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-          if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
-            // Nothing to do. If the table does not exist, it is considered empty.
-          } else {
-            throw new RuntimeException(
-                "unable to confirm BigQuery table emptiness for table "
-                    + BigQueryIO.toTableSpec(table), e);
+          if (e instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
           }
+          throw new RuntimeException(
+              "unable to confirm BigQuery table emptiness for table "
+                  + BigQueryIO.toTableSpec(tableRef), e);
         }
       }
 
@@ -1917,7 +1919,7 @@ public class BigQueryIO {
             verifyTablePresence(datasetService, table);
           }
           if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-            verifyTableEmpty(datasetService, table);
+            verifyTableNotExistOrEmpty(datasetService, table);
           }
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5b6dd91d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 7173996..32cf46d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -136,6 +136,8 @@ interface BigQueryServices extends Serializable {
 
     /**
      * Returns true if the table is empty.
+     *
+     * @throws IOException if the table is not found.
      */
     boolean isTableEmpty(String projectId, String datasetId, String tableId)
         throws IOException, InterruptedException;