You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:24 UTC

[38/50] beam git commit: [BEAM-1235] BigQueryIO.Write: log failed load/copy jobs.

[BEAM-1235] BigQueryIO.Write: log failed load/copy jobs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6531545e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6531545e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6531545e

Branch: refs/heads/python-sdk
Commit: 6531545e647f98870a69bd46fabbbadb727969e5
Parents: 2cbc08b
Author: Pei He <pe...@google.com>
Authored: Mon Jan 23 16:25:43 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jan 26 17:22:52 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 63 ++++++++++++-------
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |  1 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 64 +++++++++++++-------
 .../gcp/bigquery/BigQueryServicesImplTest.java  |  2 +
 4 files changed, 87 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b6f9fb0..4ace985 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1155,7 +1155,8 @@ public class BigQueryIO {
       jobService.startQueryJob(jobRef, queryConfig);
       Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
       if (parseStatus(job) != Status.SUCCEEDED) {
-        throw new IOException("Query job failed: " + jobId);
+        throw new IOException(String.format(
+            "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus())));
       }
     }
 
@@ -1260,8 +1261,8 @@ public class BigQueryIO {
           jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
       if (parseStatus(extractJob) != Status.SUCCEEDED) {
         throw new IOException(String.format(
-            "Extract job %s failed, status: %s",
-            extractJob.getJobReference().getJobId(), extractJob.getStatus()));
+            "Extract job %s failed, status: %s.",
+            extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus())));
       }
 
       List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
@@ -2361,30 +2362,36 @@ public class BigQueryIO {
             .setSourceFormat("NEWLINE_DELIMITED_JSON");
 
         String projectId = ref.getProjectId();
+        Job lastFailedLoadJob = null;
         for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
-          LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startLoadJob(jobRef, loadConfig);
-          Status jobStatus =
-              parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
+          Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+          Status jobStatus = parseStatus(loadJob);
           switch (jobStatus) {
             case SUCCEEDED:
               return;
             case UNKNOWN:
-              throw new RuntimeException("Failed to poll the load job status of job " + jobId);
+              throw new RuntimeException(String.format(
+                  "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob)));
             case FAILED:
-              LOG.info("BigQuery load job failed: {}", jobId);
+              lastFailedLoadJob = loadJob;
               continue;
             default:
-              throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
-                  jobStatus, jobId));
+              throw new IllegalStateException(String.format(
+                  "Unexpected status [%s] of load job: %s.",
+                  jobStatus, jobToPrettyString(loadJob)));
           }
         }
-        throw new RuntimeException(String.format("Failed to create the load job %s, reached max "
-            + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+        throw new RuntimeException(String.format(
+            "Failed to create load job with id prefix %s, "
+                + "reached max retries: %d, last failed load job: %s.",
+            jobIdPrefix,
+            Bound.MAX_RETRY_JOBS,
+            jobToPrettyString(lastFailedLoadJob)));
       }
 
       static void removeTemporaryFiles(
@@ -2491,30 +2498,36 @@ public class BigQueryIO {
             .setCreateDisposition(createDisposition.name());
 
         String projectId = ref.getProjectId();
+        Job lastFailedCopyJob = null;
         for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
-          LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startCopyJob(jobRef, copyConfig);
-          Status jobStatus =
-              parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
+          Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+          Status jobStatus = parseStatus(copyJob);
           switch (jobStatus) {
             case SUCCEEDED:
               return;
             case UNKNOWN:
-              throw new RuntimeException("Failed to poll the copy job status of job " + jobId);
+              throw new RuntimeException(String.format(
+                  "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob)));
             case FAILED:
-              LOG.info("BigQuery copy job failed: {}", jobId);
+              lastFailedCopyJob = copyJob;
               continue;
             default:
-              throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
-                  jobStatus, jobId));
+              throw new IllegalStateException(String.format(
+                  "Unexpected status [%s] of load job: %s.",
+                  jobStatus, jobToPrettyString(copyJob)));
           }
         }
-        throw new RuntimeException(String.format("Failed to create the copy job %s, reached max "
-            + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+        throw new RuntimeException(String.format(
+            "Failed to create copy job with id prefix %s, "
+                + "reached max retries: %d, last failed copy job: %s.",
+            jobIdPrefix,
+            Bound.MAX_RETRY_JOBS,
+            jobToPrettyString(lastFailedCopyJob)));
       }
 
       static void removeTemporaryTables(DatasetService tableService,
@@ -2549,6 +2562,14 @@ public class BigQueryIO {
     private Write() {}
   }
 
+  private static String jobToPrettyString(@Nullable Job job) throws IOException {
+    return job == null ? "null" : job.toPrettyString();
+  }
+
+  private static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
+    return status == null ? "Unknown status: null." : status.toPrettyString();
+  }
+
   private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
     try {
       datasetService.getDataset(table.getProjectId(), table.getDatasetId());

http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 75796ab..7c3edbe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -214,6 +214,7 @@ class BigQueryServicesImpl implements BigQueryServices {
       do {
         try {
           client.jobs().insert(jobRef.getProjectId(), job).execute();
+          LOG.info("Started BigQuery job: {}.", jobRef);
           return; // SUCCEEDED
         } catch (GoogleJsonResponseException e) {
           if (errorExtractor.itemAlreadyExists(e)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 0b8d60d..bbfc2ce 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -988,12 +988,6 @@ public class BigQueryIOTest implements Serializable {
         .withoutValidation());
     p.run();
 
-    logged.verifyInfo("Starting BigQuery load job");
-    logged.verifyInfo("BigQuery load job failed");
-    logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyInfo("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyNotLogged("try 3/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
     File tempDir = new File(bqOptions.getTempLocation());
     testNumFiles(tempDir, 0);
   }
@@ -1232,11 +1226,49 @@ public class BigQueryIOTest implements Serializable {
         .withoutValidation());
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Failed to poll the load job status");
-    p.run();
+    thrown.expectMessage("UNKNOWN status of load job");
+    try {
+      p.run();
+    } finally {
+      File tempDir = new File(bqOptions.getTempLocation());
+      testNumFiles(tempDir, 0);
+    }
+  }
 
-    File tempDir = new File(bqOptions.getTempLocation());
-    testNumFiles(tempDir, 0);
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteFailedJobs() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done", "done")
+            .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(Create.of(
+        new TableRow().set("name", "a").set("number", 1),
+        new TableRow().set("name", "b").set("number", 2),
+        new TableRow().set("name", "c").set("number", 3))
+        .withCoder(TableRowJsonCoder.of()))
+        .apply(BigQueryIO.Write.to("dataset-id.table-id")
+            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+            .withTestServices(fakeBqServices)
+            .withoutValidation());
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Failed to create load job with id prefix");
+    thrown.expectMessage("reached max retries");
+    thrown.expectMessage("last failed load job");
+
+    try {
+      p.run();
+    } finally {
+      File tempDir = new File(bqOptions.getTempLocation());
+      testNumFiles(tempDir, 0);
+    }
   }
 
   @Test
@@ -2164,12 +2196,6 @@ public class BigQueryIOTest implements Serializable {
 
     List<String> tempTables = tester.takeOutputElements();
 
-    logged.verifyInfo("Starting BigQuery load job");
-    logged.verifyInfo("BigQuery load job failed");
-    logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-
     assertEquals(expectedTempTables, tempTables);
   }
 
@@ -2237,12 +2263,6 @@ public class BigQueryIOTest implements Serializable {
     DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
     tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
     tester.processElement(null);
-
-    logged.verifyInfo("Starting BigQuery copy job");
-    logged.verifyInfo("BigQuery copy job failed");
-    logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-    logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 1ce10f1..ef51650 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -138,6 +138,7 @@ public class BigQueryServicesImplTest {
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();
+    expectedLogs.verifyInfo(String.format("Started BigQuery job: %s", jobRef));
   }
 
   /**
@@ -161,6 +162,7 @@ public class BigQueryServicesImplTest {
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();
+    expectedLogs.verifyNotLogged("Started BigQuery job");
   }
 
   /**