You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:24 UTC
[38/50] beam git commit: [BEAM-1235] BigQueryIO.Write: log failed
load/copy jobs.
[BEAM-1235] BigQueryIO.Write: log failed load/copy jobs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6531545e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6531545e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6531545e
Branch: refs/heads/python-sdk
Commit: 6531545e647f98870a69bd46fabbbadb727969e5
Parents: 2cbc08b
Author: Pei He <pe...@google.com>
Authored: Mon Jan 23 16:25:43 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jan 26 17:22:52 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 63 ++++++++++++-------
.../io/gcp/bigquery/BigQueryServicesImpl.java | 1 +
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 64 +++++++++++++-------
.../gcp/bigquery/BigQueryServicesImplTest.java | 2 +
4 files changed, 87 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b6f9fb0..4ace985 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1155,7 +1155,8 @@ public class BigQueryIO {
jobService.startQueryJob(jobRef, queryConfig);
Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
if (parseStatus(job) != Status.SUCCEEDED) {
- throw new IOException("Query job failed: " + jobId);
+ throw new IOException(String.format(
+ "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus())));
}
}
@@ -1260,8 +1261,8 @@ public class BigQueryIO {
jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
if (parseStatus(extractJob) != Status.SUCCEEDED) {
throw new IOException(String.format(
- "Extract job %s failed, status: %s",
- extractJob.getJobReference().getJobId(), extractJob.getStatus()));
+ "Extract job %s failed, status: %s.",
+ extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus())));
}
List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
@@ -2361,30 +2362,36 @@ public class BigQueryIO {
.setSourceFormat("NEWLINE_DELIMITED_JSON");
String projectId = ref.getProjectId();
+ Job lastFailedLoadJob = null;
for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
- LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
- Status jobStatus =
- parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
+ Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+ Status jobStatus = parseStatus(loadJob);
switch (jobStatus) {
case SUCCEEDED:
return;
case UNKNOWN:
- throw new RuntimeException("Failed to poll the load job status of job " + jobId);
+ throw new RuntimeException(String.format(
+ "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob)));
case FAILED:
- LOG.info("BigQuery load job failed: {}", jobId);
+ lastFailedLoadJob = loadJob;
continue;
default:
- throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
- jobStatus, jobId));
+ throw new IllegalStateException(String.format(
+ "Unexpected status [%s] of load job: %s.",
+ jobStatus, jobToPrettyString(loadJob)));
}
}
- throw new RuntimeException(String.format("Failed to create the load job %s, reached max "
- + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+ throw new RuntimeException(String.format(
+ "Failed to create load job with id prefix %s, "
+ + "reached max retries: %d, last failed load job: %s.",
+ jobIdPrefix,
+ Bound.MAX_RETRY_JOBS,
+ jobToPrettyString(lastFailedLoadJob)));
}
static void removeTemporaryFiles(
@@ -2491,30 +2498,36 @@ public class BigQueryIO {
.setCreateDisposition(createDisposition.name());
String projectId = ref.getProjectId();
+ Job lastFailedCopyJob = null;
for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
- LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startCopyJob(jobRef, copyConfig);
- Status jobStatus =
- parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
+ Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+ Status jobStatus = parseStatus(copyJob);
switch (jobStatus) {
case SUCCEEDED:
return;
case UNKNOWN:
- throw new RuntimeException("Failed to poll the copy job status of job " + jobId);
+ throw new RuntimeException(String.format(
+ "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob)));
case FAILED:
- LOG.info("BigQuery copy job failed: {}", jobId);
+ lastFailedCopyJob = copyJob;
continue;
default:
- throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
- jobStatus, jobId));
+ throw new IllegalStateException(String.format(
+ "Unexpected status [%s] of load job: %s.",
+ jobStatus, jobToPrettyString(copyJob)));
}
}
- throw new RuntimeException(String.format("Failed to create the copy job %s, reached max "
- + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+ throw new RuntimeException(String.format(
+ "Failed to create copy job with id prefix %s, "
+ + "reached max retries: %d, last failed copy job: %s.",
+ jobIdPrefix,
+ Bound.MAX_RETRY_JOBS,
+ jobToPrettyString(lastFailedCopyJob)));
}
static void removeTemporaryTables(DatasetService tableService,
@@ -2549,6 +2562,14 @@ public class BigQueryIO {
private Write() {}
}
+ private static String jobToPrettyString(@Nullable Job job) throws IOException {
+ return job == null ? "null" : job.toPrettyString();
+ }
+
+ private static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
+ return status == null ? "Unknown status: null." : status.toPrettyString();
+ }
+
private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getDataset(table.getProjectId(), table.getDatasetId());
http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 75796ab..7c3edbe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -214,6 +214,7 @@ class BigQueryServicesImpl implements BigQueryServices {
do {
try {
client.jobs().insert(jobRef.getProjectId(), job).execute();
+ LOG.info("Started BigQuery job: {}.", jobRef);
return; // SUCCEEDED
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemAlreadyExists(e)) {
http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 0b8d60d..bbfc2ce 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -988,12 +988,6 @@ public class BigQueryIOTest implements Serializable {
.withoutValidation());
p.run();
- logged.verifyInfo("Starting BigQuery load job");
- logged.verifyInfo("BigQuery load job failed");
- logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyInfo("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyNotLogged("try 3/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
File tempDir = new File(bqOptions.getTempLocation());
testNumFiles(tempDir, 0);
}
@@ -1232,11 +1226,49 @@ public class BigQueryIOTest implements Serializable {
.withoutValidation());
thrown.expect(RuntimeException.class);
- thrown.expectMessage("Failed to poll the load job status");
- p.run();
+ thrown.expectMessage("UNKNOWN status of load job");
+ try {
+ p.run();
+ } finally {
+ File tempDir = new File(bqOptions.getTempLocation());
+ testNumFiles(tempDir, 0);
+ }
+ }
- File tempDir = new File(bqOptions.getTempLocation());
- testNumFiles(tempDir, 0);
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWriteFailedJobs() throws Exception {
+ BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ bqOptions.setProject("defaultProject");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(new FakeJobService()
+ .startJobReturns("done", "done", "done")
+ .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED));
+
+ Pipeline p = TestPipeline.create(bqOptions);
+ p.apply(Create.of(
+ new TableRow().set("name", "a").set("number", 1),
+ new TableRow().set("name", "b").set("number", 2),
+ new TableRow().set("name", "c").set("number", 3))
+ .withCoder(TableRowJsonCoder.of()))
+ .apply(BigQueryIO.Write.to("dataset-id.table-id")
+ .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Failed to create load job with id prefix");
+ thrown.expectMessage("reached max retries");
+ thrown.expectMessage("last failed load job");
+
+ try {
+ p.run();
+ } finally {
+ File tempDir = new File(bqOptions.getTempLocation());
+ testNumFiles(tempDir, 0);
+ }
}
@Test
@@ -2164,12 +2196,6 @@ public class BigQueryIOTest implements Serializable {
List<String> tempTables = tester.takeOutputElements();
- logged.verifyInfo("Starting BigQuery load job");
- logged.verifyInfo("BigQuery load job failed");
- logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
-
assertEquals(expectedTempTables, tempTables);
}
@@ -2237,12 +2263,6 @@ public class BigQueryIOTest implements Serializable {
DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
tester.processElement(null);
-
- logged.verifyInfo("Starting BigQuery copy job");
- logged.verifyInfo("BigQuery copy job failed");
- logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
- logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS);
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/6531545e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 1ce10f1..ef51650 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -138,6 +138,7 @@ public class BigQueryServicesImplTest {
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
+ expectedLogs.verifyInfo(String.format("Started BigQuery job: %s", jobRef));
}
/**
@@ -161,6 +162,7 @@ public class BigQueryServicesImplTest {
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
+ expectedLogs.verifyNotLogged("Started BigQuery job");
}
/**