You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/05/24 04:04:36 UTC
[beam] branch master updated: [BEAM-6284] Improve error message on
waitUntilFinish. (#8629)
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ea32ab9 [BEAM-6284] Improve error message on waitUntilFinish. (#8629)
ea32ab9 is described below
commit ea32ab940453a1800dffce0833d732b7246deafa
Author: Mikhail Gryzykhin <12...@users.noreply.github.com>
AuthorDate: Thu May 23 21:04:16 2019 -0700
[BEAM-6284] Improve error message on waitUntilFinish. (#8629)
* Improve error message on waitUntilFinish.
Allow for infinite wait.
---
.../beam/runners/dataflow/DataflowPipelineJob.java | 229 +++++++++++++--------
.../runners/dataflow/DataflowPipelineJobTest.java | 40 +++-
2 files changed, 177 insertions(+), 92 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 049a904..c592425 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.metrics.MetricResults;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
/** A DataflowPipelineJob represents a job submitted to Dataflow using {@link DataflowRunner}. */
public class DataflowPipelineJob implements PipelineResult {
+
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
/** The id for the job. */
@@ -92,6 +94,8 @@ public class DataflowPipelineJob implements PipelineResult {
static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2);
+ static final Duration DEFAULT_MAX_BACKOFF = Duration.standardMinutes(2);
+
static final double DEFAULT_BACKOFF_EXPONENT = 1.5;
/** The amount of polling retries for job status and messages information. */
@@ -103,7 +107,9 @@ public class DataflowPipelineJob implements PipelineResult {
FluentBackoff.DEFAULT
.withInitialBackoff(MESSAGES_POLLING_INTERVAL)
.withMaxRetries(MESSAGES_POLLING_RETRIES)
- .withExponent(DEFAULT_BACKOFF_EXPONENT);
+ .withExponent(DEFAULT_BACKOFF_EXPONENT)
+ .withMaxBackoff(DEFAULT_MAX_BACKOFF);
+
protected static final FluentBackoff STATUS_BACKOFF_FACTORY =
FluentBackoff.DEFAULT
.withInitialBackoff(STATUS_POLLING_INTERVAL)
@@ -238,6 +244,16 @@ public class DataflowPipelineJob implements PipelineResult {
duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient));
}
+ private static BackOff getMessagesBackoff(Duration duration) {
+ FluentBackoff factory = MESSAGES_BACKOFF_FACTORY;
+
+ if (!duration.isShorterThan(Duration.ZERO)) {
+ factory = factory.withMaxCumulativeBackoff(duration);
+ }
+
+ return BackOffAdapter.toGcpBackOff(factory.backoff());
+ }
+
/**
* Waits until the pipeline finishes and returns the final status.
*
@@ -261,96 +277,128 @@ public class DataflowPipelineJob implements PipelineResult {
MonitoringUtil monitor)
throws IOException, InterruptedException {
- BackOff backoff;
- if (!duration.isLongerThan(Duration.ZERO)) {
- backoff = BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff());
- } else {
- backoff =
- BackOffAdapter.toGcpBackOff(
- MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
- }
+ BackOff backoff = getMessagesBackoff(duration);
// This function tracks the cumulative time from the *first request* to enforce the wall-clock
// limit. Any backoff instance could, at best, track the the time since the first attempt at a
// given request. Thus, we need to track the cumulative time ourselves.
long startNanos = nanoClock.nanoTime();
- State state;
+ State state = State.UNKNOWN;
+ Exception exception;
do {
- // Get the state of the job before listing messages. This ensures we always fetch job
- // messages after the job finishes to ensure we have all them.
- state =
- getStateWithRetries(
- BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
- sleeper);
- boolean hasError = state == State.UNKNOWN;
-
- if (messageHandler != null && !hasError) {
- // Process all the job messages that have accumulated so far.
- try {
- List<JobMessage> allMessages = monitor.getJobMessages(getJobId(), lastTimestamp);
-
- if (!allMessages.isEmpty()) {
- lastTimestamp =
- fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
- messageHandler.process(allMessages);
- }
- } catch (GoogleJsonResponseException | SocketTimeoutException e) {
- hasError = true;
- LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
- LOG.debug("Exception information:", e);
- }
+ exception = null;
+ try {
+ // Get the state of the job before listing messages. This ensures we always fetch job
+ // messages after the job finishes to ensure we have all them.
+ state =
+ getStateWithRetries(
+ BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
+ sleeper);
+ } catch (IOException e) {
+ exception = e;
+ LOG.warn("Failed to get job state: {}", e.getMessage());
+ LOG.debug("Failed to get job state: {}", e);
+ continue;
}
- if (!hasError) {
- // We can stop if the job is done.
- if (state.isTerminal()) {
- switch (state) {
- case DONE:
- case CANCELLED:
- LOG.info("Job {} finished with status {}.", getJobId(), state);
- break;
- case UPDATED:
- LOG.info(
- "Job {} has been updated and is running as the new job with id {}. "
- + "To access the updated job on the Dataflow monitoring console, "
- + "please navigate to {}",
- getJobId(),
- getReplacedByJob().getJobId(),
- MonitoringUtil.getJobMonitoringPageURL(
- getReplacedByJob().getProjectId(),
- getRegion(),
- getReplacedByJob().getJobId()));
- break;
- default:
- LOG.info("Job {} failed with status {}.", getJobId(), state);
- }
- return state;
- }
+ exception = processJobMessages(messageHandler, monitor);
- // The job is not done, so we must keep polling.
- backoff.reset();
-
- // If a total duration for all backoff has been set, update the new cumulative sleep time to
- // be the remaining total backoff duration, stopping if we have already exceeded the
- // allotted time.
- if (duration.isLongerThan(Duration.ZERO)) {
- long nanosConsumed = nanoClock.nanoTime() - startNanos;
- Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
- Duration remaining = duration.minus(consumed);
- if (remaining.isLongerThan(Duration.ZERO)) {
- backoff =
- BackOffAdapter.toGcpBackOff(
- MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff());
- } else {
- // If there is no time remaining, don't bother backing off.
- backoff = BackOff.STOP_BACKOFF;
- }
- }
+ if (exception != null) {
+ continue;
+ }
+
+ // We can stop if the job is done.
+ if (state.isTerminal()) {
+ logTerminalState(state);
+ return state;
}
+
+ // Reset attempts count and update cumulative wait time.
+ backoff = resetBackoff(duration, nanoClock, startNanos);
} while (BackOffUtils.next(sleeper, backoff));
- LOG.warn("No terminal state was returned. State value {}", state);
- return null; // Timed out.
+
+ // At this point Backoff decided that we retried enough times.
+ // This can be either due to exceeding allowed timeout for job to complete, or receiving
+ // error multiple times in a row.
+
+ if (exception == null) {
+ LOG.warn("No terminal state was returned within allotted timeout. State value {}", state);
+ } else {
+ LOG.error("Failed to fetch job metadata with error: {}", exception);
+ }
+
+ return null;
+ }
+
+ private void logTerminalState(State state) {
+ switch (state) {
+ case DONE:
+ case CANCELLED:
+ LOG.info("Job {} finished with status {}.", getJobId(), state);
+ break;
+ case UPDATED:
+ LOG.info(
+ "Job {} has been updated and is running as the new job with id {}. "
+ + "To access the updated job on the Dataflow monitoring console, "
+ + "please navigate to {}",
+ getJobId(),
+ getReplacedByJob().getJobId(),
+ MonitoringUtil.getJobMonitoringPageURL(
+ getReplacedByJob().getProjectId(), getRegion(), getReplacedByJob().getJobId()));
+ break;
+ default:
+ LOG.info("Job {} failed with status {}.", getJobId(), state);
+ }
+ }
+
+ /**
+ * Reset backoff. If duration is limited, calculate time remaining, otherwise just reset retry
+ * count.
+ *
+ * <p>If a total duration for all backoff has been set, update the new cumulative sleep time to be
+ * the remaining total backoff duration, stopping if we have already exceeded the allotted time.
+ */
+ private static BackOff resetBackoff(Duration duration, NanoClock nanoClock, long startNanos) {
+ BackOff backoff;
+ if (duration.isLongerThan(Duration.ZERO)) {
+ long nanosConsumed = nanoClock.nanoTime() - startNanos;
+ Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
+ Duration remaining = duration.minus(consumed);
+ if (remaining.isLongerThan(Duration.ZERO)) {
+ backoff = getMessagesBackoff(remaining);
+ } else {
+ backoff = BackOff.STOP_BACKOFF;
+ }
+ } else {
+ backoff = getMessagesBackoff(duration);
+ }
+ return backoff;
+ }
+
+ /**
+ * Process all the job messages that have accumulated so far.
+ *
+ * @return Exception that caused failure to process messages or null.
+ */
+ private Exception processJobMessages(
+ @Nullable JobMessagesHandler messageHandler, MonitoringUtil monitor) throws IOException {
+ if (messageHandler != null) {
+ try {
+ List<JobMessage> allMessages = monitor.getJobMessages(getJobId(), lastTimestamp);
+
+ if (!allMessages.isEmpty()) {
+ lastTimestamp =
+ fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
+ messageHandler.process(allMessages);
+ }
+ } catch (GoogleJsonResponseException | SocketTimeoutException e) {
+ LOG.warn("Failed to get job messages: {}", e.getMessage());
+ LOG.debug("Failed to get job messages: {}", e);
+ return e;
+ }
+ }
+ return null;
}
private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>();
@@ -372,10 +420,11 @@ public class DataflowPipelineJob implements PipelineResult {
() -> {
Job content = new Job();
content.setProjectId(getProjectId());
- content.setId(jobId);
+ String currentJobId = getJobId();
+ content.setId(currentJobId);
content.setRequestedState("JOB_STATE_CANCELLED");
try {
- Job job = dataflowClient.updateJob(getJobId(), content);
+ Job job = dataflowClient.updateJob(currentJobId, content);
return MonitoringUtil.toState(job.getCurrentState());
} catch (IOException e) {
State state = getState();
@@ -426,7 +475,7 @@ public class DataflowPipelineJob implements PipelineResult {
return terminalState;
}
- return getStateWithRetries(
+ return getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()), Sleeper.DEFAULT);
}
@@ -439,13 +488,9 @@ public class DataflowPipelineJob implements PipelineResult {
* @return The state of the job or State.UNKNOWN in case of failure.
*/
@VisibleForTesting
- State getStateWithRetries(BackOff attempts, Sleeper sleeper) {
- if (terminalState != null) {
- return terminalState;
- }
+ State getStateWithRetriesOrUnknownOnException(BackOff attempts, Sleeper sleeper) {
try {
- Job job = getJobWithRetries(attempts, sleeper);
- return MonitoringUtil.toState(job.getCurrentState());
+ return getStateWithRetries(attempts, sleeper);
} catch (IOException exn) {
// The only IOException that getJobWithRetries is permitted to throw is the final IOException
// that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
@@ -454,6 +499,14 @@ public class DataflowPipelineJob implements PipelineResult {
}
}
+ State getStateWithRetries(BackOff attempts, Sleeper sleeper) throws IOException {
+ if (terminalState != null) {
+ return terminalState;
+ }
+ Job job = getJobWithRetries(attempts, sleeper);
+ return MonitoringUtil.toState(job.getCurrentState());
+ }
+
/**
* Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
* maximum number of passed in attempts.
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 48faf23..aa94a7b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -225,13 +225,27 @@ public class DataflowPipelineJobTest {
}
/**
+ * Tests that the {@link DataflowPipelineJob} understands that the {@link State#UPDATED UPDATED}
+ * state is terminal.
+ */
+ @Test
+ public void testWaitToFinishLogsError() throws Exception {
+ assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED));
+ expectedLogs.verifyInfo(
+ String.format(
+ "Job %s has been updated and is running as the new job with id %s.",
+ JOB_ID, REPLACEMENT_JOB_ID));
+ }
+
+ /**
* Tests that the {@link DataflowPipelineJob} understands that the {@link State#UNKNOWN UNKNOWN}
* state is terminal.
*/
@Test
public void testWaitToFinishUnknown() throws Exception {
assertEquals(null, mockWaitToFinishInState(State.UNKNOWN));
- expectedLogs.verifyWarn("No terminal state was returned. State value UNKNOWN");
+ expectedLogs.verifyWarn(
+ "No terminal state was returned within allotted timeout. State value UNKNOWN");
}
@Test
@@ -311,13 +325,31 @@ public class DataflowPipelineJobTest {
assertEquals(
State.RUNNING,
- job.getStateWithRetries(
+ job.getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
fastClock));
}
@Test
- public void testGetStateWithExceptionReturnsUnknown() throws Exception {
+ public void testGetStateWithRetriesPassesExceptionThrough() throws Exception {
+ Dataflow.Projects.Locations.Jobs.Get statusRequest =
+ mock(Dataflow.Projects.Locations.Jobs.Get.class);
+
+ when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
+ when(statusRequest.execute()).thenThrow(IOException.class);
+
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
+
+ long startTime = fastClock.nanoTime();
+ thrown.expect(IOException.class);
+ job.getStateWithRetries(
+ BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
+ fastClock);
+ }
+
+ @Test
+ public void testGetStateNoThrowWithExceptionReturnsUnknown() throws Exception {
Dataflow.Projects.Locations.Jobs.Get statusRequest =
mock(Dataflow.Projects.Locations.Jobs.Get.class);
@@ -330,7 +362,7 @@ public class DataflowPipelineJobTest {
long startTime = fastClock.nanoTime();
assertEquals(
State.UNKNOWN,
- job.getStateWithRetries(
+ job.getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
fastClock));
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);