You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 16:00:42 UTC
[1/2] beam git commit: [BEAM-2162] Add logging to long BigQuery jobs
Repository: beam
Updated Branches:
refs/heads/master d9293007d -> ade5cbea6
[BEAM-2162] Add logging to long BigQuery jobs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/690ec3b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/690ec3b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/690ec3b1
Branch: refs/heads/master
Commit: 690ec3b1f7b6ce9caaa7b9e401878e136f44bc50
Parents: d929300
Author: bchambers <bc...@google.com>
Authored: Wed May 3 16:40:09 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 09:00:34 2017 -0700
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryServicesImpl.java | 20 +++++++++++++++++++-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 2 ++
2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/690ec3b1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 8e395f0..b348abd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +81,9 @@ class BigQueryServicesImpl implements BigQueryServices {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
+ // How frequently to log while polling.
+ private static final Duration POLLING_LOG_GAP = Duration.standardMinutes(10);
+
// The maximum number of retries to execute a BigQuery RPC.
private static final int MAX_RPC_RETRIES = 9;
@@ -219,7 +223,8 @@ class BigQueryServicesImpl implements BigQueryServices {
do {
try {
client.jobs().insert(jobRef.getProjectId(), job).execute();
- LOG.info("Started BigQuery job: {}.", jobRef);
+ LOG.info("Started BigQuery job: {}.\n{}", jobRef,
+ formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId()));
return; // SUCCEEDED
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemAlreadyExists(e)) {
@@ -257,6 +262,7 @@ class BigQueryServicesImpl implements BigQueryServices {
JobReference jobRef,
Sleeper sleeper,
BackOff backoff) throws InterruptedException {
+ Instant nextLog = Instant.now().plus(POLLING_LOG_GAP);
do {
try {
Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
@@ -265,6 +271,13 @@ class BigQueryServicesImpl implements BigQueryServices {
return job;
}
// The job is not DONE, wait longer and retry.
+ if (Instant.now().isAfter(nextLog)) {
+ LOG.info("Still waiting for BigQuery job {}\n{}",
+ jobRef.getJobId(),
+ formatBqStatusCommand(
+ jobRef.getProjectId(), jobRef.getJobId()));
+ nextLog = Instant.now().plus(POLLING_LOG_GAP);
+ }
} catch (IOException e) {
// ignore and retry
LOG.info("Ignore the error and retry polling job status.", e);
@@ -274,6 +287,11 @@ class BigQueryServicesImpl implements BigQueryServices {
return null;
}
+ private static String formatBqStatusCommand(String projectId, String jobId) {
+ return String.format("bq show -j --format=prettyjson --project_id=%s %s",
+ projectId, jobId);
+ }
+
@Override
public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
throws InterruptedException, IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/690ec3b1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 41e298c..49000d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -141,6 +141,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
BigQueryHelpers.statusToPrettyString(extractJob.getStatus())));
}
+ LOG.info("BigQuery extract job completed: {}", jobId);
+
List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
return ImmutableList.copyOf(tempFiles);
}
[2/2] beam git commit: This closes #2882
Posted by dh...@apache.org.
This closes #2882
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ade5cbea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ade5cbea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ade5cbea
Branch: refs/heads/master
Commit: ade5cbea605b99ebb6e566491ec64e12fc1a663d
Parents: d929300 690ec3b
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 09:00:36 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 09:00:36 2017 -0700
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryServicesImpl.java | 20 +++++++++++++++++++-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 2 ++
2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------