You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/05/24 04:04:36 UTC

[beam] branch master updated: [BEAM-6284] Improve error message on waitUntilFinish. (#8629)

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ea32ab9  [BEAM-6284] Improve error message on waitUntilFinish. (#8629)
ea32ab9 is described below

commit ea32ab940453a1800dffce0833d732b7246deafa
Author: Mikhail Gryzykhin <12...@users.noreply.github.com>
AuthorDate: Thu May 23 21:04:16 2019 -0700

    [BEAM-6284] Improve error message on waitUntilFinish. (#8629)
    
    * Improve error message on waitUntilFinish.
    
    Allow for infinite wait.
---
 .../beam/runners/dataflow/DataflowPipelineJob.java | 229 +++++++++++++--------
 .../runners/dataflow/DataflowPipelineJobTest.java  |  40 +++-
 2 files changed, 177 insertions(+), 92 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 049a904..c592425 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
 
 /** A DataflowPipelineJob represents a job submitted to Dataflow using {@link DataflowRunner}. */
 public class DataflowPipelineJob implements PipelineResult {
+
   private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
 
   /** The id for the job. */
@@ -92,6 +94,8 @@ public class DataflowPipelineJob implements PipelineResult {
 
   static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2);
 
+  static final Duration DEFAULT_MAX_BACKOFF = Duration.standardMinutes(2);
+
   static final double DEFAULT_BACKOFF_EXPONENT = 1.5;
 
   /** The amount of polling retries for job status and messages information. */
@@ -103,7 +107,9 @@ public class DataflowPipelineJob implements PipelineResult {
       FluentBackoff.DEFAULT
           .withInitialBackoff(MESSAGES_POLLING_INTERVAL)
           .withMaxRetries(MESSAGES_POLLING_RETRIES)
-          .withExponent(DEFAULT_BACKOFF_EXPONENT);
+          .withExponent(DEFAULT_BACKOFF_EXPONENT)
+          .withMaxBackoff(DEFAULT_MAX_BACKOFF);
+
   protected static final FluentBackoff STATUS_BACKOFF_FACTORY =
       FluentBackoff.DEFAULT
           .withInitialBackoff(STATUS_POLLING_INTERVAL)
@@ -238,6 +244,16 @@ public class DataflowPipelineJob implements PipelineResult {
         duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient));
   }
 
+  private static BackOff getMessagesBackoff(Duration duration) {
+    FluentBackoff factory = MESSAGES_BACKOFF_FACTORY;
+
+    if (!duration.isShorterThan(Duration.ZERO)) {
+      factory = factory.withMaxCumulativeBackoff(duration);
+    }
+
+    return BackOffAdapter.toGcpBackOff(factory.backoff());
+  }
+
   /**
    * Waits until the pipeline finishes and returns the final status.
    *
@@ -261,96 +277,128 @@ public class DataflowPipelineJob implements PipelineResult {
       MonitoringUtil monitor)
       throws IOException, InterruptedException {
 
-    BackOff backoff;
-    if (!duration.isLongerThan(Duration.ZERO)) {
-      backoff = BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff());
-    } else {
-      backoff =
-          BackOffAdapter.toGcpBackOff(
-              MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
-    }
+    BackOff backoff = getMessagesBackoff(duration);
 
     // This function tracks the cumulative time from the *first request* to enforce the wall-clock
     // limit. Any backoff instance could, at best, track the the time since the first attempt at a
     // given request. Thus, we need to track the cumulative time ourselves.
     long startNanos = nanoClock.nanoTime();
 
-    State state;
+    State state = State.UNKNOWN;
+    Exception exception;
     do {
-      // Get the state of the job before listing messages. This ensures we always fetch job
-      // messages after the job finishes to ensure we have all them.
-      state =
-          getStateWithRetries(
-              BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
-              sleeper);
-      boolean hasError = state == State.UNKNOWN;
-
-      if (messageHandler != null && !hasError) {
-        // Process all the job messages that have accumulated so far.
-        try {
-          List<JobMessage> allMessages = monitor.getJobMessages(getJobId(), lastTimestamp);
-
-          if (!allMessages.isEmpty()) {
-            lastTimestamp =
-                fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
-            messageHandler.process(allMessages);
-          }
-        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
-          hasError = true;
-          LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
-          LOG.debug("Exception information:", e);
-        }
+      exception = null;
+      try {
+        // Get the state of the job before listing messages. This ensures we always fetch job
+        // messages after the job finishes to ensure we have all them.
+        state =
+            getStateWithRetries(
+                BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
+                sleeper);
+      } catch (IOException e) {
+        exception = e;
+        LOG.warn("Failed to get job state: {}", e.getMessage());
+        LOG.debug("Failed to get job state: {}", e);
+        continue;
       }
 
-      if (!hasError) {
-        // We can stop if the job is done.
-        if (state.isTerminal()) {
-          switch (state) {
-            case DONE:
-            case CANCELLED:
-              LOG.info("Job {} finished with status {}.", getJobId(), state);
-              break;
-            case UPDATED:
-              LOG.info(
-                  "Job {} has been updated and is running as the new job with id {}. "
-                      + "To access the updated job on the Dataflow monitoring console, "
-                      + "please navigate to {}",
-                  getJobId(),
-                  getReplacedByJob().getJobId(),
-                  MonitoringUtil.getJobMonitoringPageURL(
-                      getReplacedByJob().getProjectId(),
-                      getRegion(),
-                      getReplacedByJob().getJobId()));
-              break;
-            default:
-              LOG.info("Job {} failed with status {}.", getJobId(), state);
-          }
-          return state;
-        }
+      exception = processJobMessages(messageHandler, monitor);
 
-        // The job is not done, so we must keep polling.
-        backoff.reset();
-
-        // If a total duration for all backoff has been set, update the new cumulative sleep time to
-        // be the remaining total backoff duration, stopping if we have already exceeded the
-        // allotted time.
-        if (duration.isLongerThan(Duration.ZERO)) {
-          long nanosConsumed = nanoClock.nanoTime() - startNanos;
-          Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
-          Duration remaining = duration.minus(consumed);
-          if (remaining.isLongerThan(Duration.ZERO)) {
-            backoff =
-                BackOffAdapter.toGcpBackOff(
-                    MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff());
-          } else {
-            // If there is no time remaining, don't bother backing off.
-            backoff = BackOff.STOP_BACKOFF;
-          }
-        }
+      if (exception != null) {
+        continue;
+      }
+
+      // We can stop if the job is done.
+      if (state.isTerminal()) {
+        logTerminalState(state);
+        return state;
       }
+
+      // Reset attempts count and update cumulative wait time.
+      backoff = resetBackoff(duration, nanoClock, startNanos);
     } while (BackOffUtils.next(sleeper, backoff));
-    LOG.warn("No terminal state was returned. State value {}", state);
-    return null; // Timed out.
+
+    // At this point Backoff decided that we retried enough times.
+    // This can be either due to exceeding allowed timeout for job to complete, or receiving
+    // error multiple times in a row.
+
+    if (exception == null) {
+      LOG.warn("No terminal state was returned within allotted timeout. State value {}", state);
+    } else {
+      LOG.error("Failed to fetch job metadata with error: {}", exception);
+    }
+
+    return null;
+  }
+
+  private void logTerminalState(State state) {
+    switch (state) {
+      case DONE:
+      case CANCELLED:
+        LOG.info("Job {} finished with status {}.", getJobId(), state);
+        break;
+      case UPDATED:
+        LOG.info(
+            "Job {} has been updated and is running as the new job with id {}. "
+                + "To access the updated job on the Dataflow monitoring console, "
+                + "please navigate to {}",
+            getJobId(),
+            getReplacedByJob().getJobId(),
+            MonitoringUtil.getJobMonitoringPageURL(
+                getReplacedByJob().getProjectId(), getRegion(), getReplacedByJob().getJobId()));
+        break;
+      default:
+        LOG.info("Job {} failed with status {}.", getJobId(), state);
+    }
+  }
+
+  /**
+   * Reset backoff. If duration is limited, calculate time remaining, otherwise just reset retry
+   * count.
+   *
+   * <p>If a total duration for all backoff has been set, update the new cumulative sleep time to be
+   * the remaining total backoff duration, stopping if we have already exceeded the allotted time.
+   */
+  private static BackOff resetBackoff(Duration duration, NanoClock nanoClock, long startNanos) {
+    BackOff backoff;
+    if (duration.isLongerThan(Duration.ZERO)) {
+      long nanosConsumed = nanoClock.nanoTime() - startNanos;
+      Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
+      Duration remaining = duration.minus(consumed);
+      if (remaining.isLongerThan(Duration.ZERO)) {
+        backoff = getMessagesBackoff(remaining);
+      } else {
+        backoff = BackOff.STOP_BACKOFF;
+      }
+    } else {
+      backoff = getMessagesBackoff(duration);
+    }
+    return backoff;
+  }
+
+  /**
+   * Process all the job messages that have accumulated so far.
+   *
+   * @return Exception that caused failure to process messages or null.
+   */
+  private Exception processJobMessages(
+      @Nullable JobMessagesHandler messageHandler, MonitoringUtil monitor) throws IOException {
+    if (messageHandler != null) {
+      try {
+        List<JobMessage> allMessages = monitor.getJobMessages(getJobId(), lastTimestamp);
+
+        if (!allMessages.isEmpty()) {
+          lastTimestamp =
+              fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
+          messageHandler.process(allMessages);
+        }
+      } catch (GoogleJsonResponseException | SocketTimeoutException e) {
+        LOG.warn("Failed to get job messages: {}", e.getMessage());
+        LOG.debug("Failed to get job messages: {}", e);
+        return e;
+      }
+    }
+    return null;
   }
 
   private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>();
@@ -372,10 +420,11 @@ public class DataflowPipelineJob implements PipelineResult {
             () -> {
               Job content = new Job();
               content.setProjectId(getProjectId());
-              content.setId(jobId);
+              String currentJobId = getJobId();
+              content.setId(currentJobId);
               content.setRequestedState("JOB_STATE_CANCELLED");
               try {
-                Job job = dataflowClient.updateJob(getJobId(), content);
+                Job job = dataflowClient.updateJob(currentJobId, content);
                 return MonitoringUtil.toState(job.getCurrentState());
               } catch (IOException e) {
                 State state = getState();
@@ -426,7 +475,7 @@ public class DataflowPipelineJob implements PipelineResult {
       return terminalState;
     }
 
-    return getStateWithRetries(
+    return getStateWithRetriesOrUnknownOnException(
         BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()), Sleeper.DEFAULT);
   }
 
@@ -439,13 +488,9 @@ public class DataflowPipelineJob implements PipelineResult {
    * @return The state of the job or State.UNKNOWN in case of failure.
    */
   @VisibleForTesting
-  State getStateWithRetries(BackOff attempts, Sleeper sleeper) {
-    if (terminalState != null) {
-      return terminalState;
-    }
+  State getStateWithRetriesOrUnknownOnException(BackOff attempts, Sleeper sleeper) {
     try {
-      Job job = getJobWithRetries(attempts, sleeper);
-      return MonitoringUtil.toState(job.getCurrentState());
+      return getStateWithRetries(attempts, sleeper);
     } catch (IOException exn) {
       // The only IOException that getJobWithRetries is permitted to throw is the final IOException
       // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
@@ -454,6 +499,14 @@ public class DataflowPipelineJob implements PipelineResult {
     }
   }
 
+  State getStateWithRetries(BackOff attempts, Sleeper sleeper) throws IOException {
+    if (terminalState != null) {
+      return terminalState;
+    }
+    Job job = getJobWithRetries(attempts, sleeper);
+    return MonitoringUtil.toState(job.getCurrentState());
+  }
+
   /**
    * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
    * maximum number of passed in attempts.
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 48faf23..aa94a7b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -225,13 +225,27 @@ public class DataflowPipelineJobTest {
   }
 
   /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link State#UPDATED UPDATED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishLogsError() throws Exception {
+    assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED));
+    expectedLogs.verifyInfo(
+        String.format(
+            "Job %s has been updated and is running as the new job with id %s.",
+            JOB_ID, REPLACEMENT_JOB_ID));
+  }
+
+  /**
    * Tests that the {@link DataflowPipelineJob} understands that the {@link State#UNKNOWN UNKNOWN}
    * state is terminal.
    */
   @Test
   public void testWaitToFinishUnknown() throws Exception {
     assertEquals(null, mockWaitToFinishInState(State.UNKNOWN));
-    expectedLogs.verifyWarn("No terminal state was returned. State value UNKNOWN");
+    expectedLogs.verifyWarn(
+        "No terminal state was returned within allotted timeout. State value UNKNOWN");
   }
 
   @Test
@@ -311,13 +325,31 @@ public class DataflowPipelineJobTest {
 
     assertEquals(
         State.RUNNING,
-        job.getStateWithRetries(
+        job.getStateWithRetriesOrUnknownOnException(
             BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
             fastClock));
   }
 
   @Test
-  public void testGetStateWithExceptionReturnsUnknown() throws Exception {
+  public void testGetStateWithRetriesPassesExceptionThrough() throws Exception {
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
+
+    long startTime = fastClock.nanoTime();
+    thrown.expect(IOException.class);
+    job.getStateWithRetries(
+        BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
+        fastClock);
+  }
+
+  @Test
+  public void testGetStateNoThrowWithExceptionReturnsUnknown() throws Exception {
     Dataflow.Projects.Locations.Jobs.Get statusRequest =
         mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
@@ -330,7 +362,7 @@ public class DataflowPipelineJobTest {
     long startTime = fastClock.nanoTime();
     assertEquals(
         State.UNKNOWN,
-        job.getStateWithRetries(
+        job.getStateWithRetriesOrUnknownOnException(
             BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
             fastClock));
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);