You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/09 00:56:55 UTC

[2/2] incubator-beam git commit: DataflowPipelineJob: catch an underflow in backoff code

DataflowPipelineJob: catch an underflow in backoff code

Forward port of https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/422


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ed3b12ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ed3b12ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ed3b12ab

Branch: refs/heads/master
Commit: ed3b12ab764d7867813957f22b67a518d5140ecd
Parents: 7fcc944
Author: Daniel Halperin <dh...@users.noreply.github.com>
Authored: Wed Sep 7 16:57:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Sep 8 17:56:43 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 26 +++++++----
 .../dataflow/DataflowPipelineJobTest.java       | 46 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed3b12ab/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index dad59f2..1af8c98 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -263,19 +263,27 @@ public class DataflowPipelineJob implements PipelineResult {
       }
 
       if (!hasError) {
-        // Reset the backoff.
+        // We can stop if the job is done.
+        if (state.isTerminal()) {
+          return state;
+        }
+
+        // The job is not done, so we must keep polling.
         backoff.reset();
-        // If duration is set, update the new cumulative sleep time to be the remaining
-        // part of the total input sleep duration.
+
+        // If a total duration for all backoff has been set, update the new cumulative sleep time to
+        // be the remaining total backoff duration, stopping if we have already exceeded the
+        // allotted time.
         if (duration.isLongerThan(Duration.ZERO)) {
           long nanosConsumed = nanoClock.nanoTime() - startNanos;
           Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
-          backoff =
-              MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration.minus(consumed)).backoff();
-        }
-        // Check if the job is done.
-        if (state.isTerminal()) {
-          return state;
+          Duration remaining = duration.minus(consumed);
+          if (remaining.isLongerThan(Duration.ZERO)) {
+            backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff();
+          } else {
+            // If there is no time remaining, don't bother backing off.
+            backoff = BackOff.STOP_BACKOFF;
+          }
         }
       }
     } while(BackOffUtils.next(sleeper, backoff));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed3b12ab/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 226140a..4c70d12 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -32,6 +32,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
 import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics;
@@ -46,6 +48,7 @@ import com.google.common.collect.ImmutableSetMultimap;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
@@ -249,6 +252,30 @@ public class DataflowPipelineJobTest {
   }
 
   @Test
+  public void testCumulativeTimeOverflow() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_RUNNING");
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    long startTime = clock.nanoTime();
+    State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
+    assertEquals(null, state);
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - startTime);
+    // Should only have slept for the 4 ms allowed.
+    assertThat(timeDiff, lessThanOrEqualTo(4L));
+  }
+
+  @Test
   public void testGetStateReturnsServiceState() throws Exception {
     Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
 
@@ -609,4 +636,23 @@ public class DataflowPipelineJobTest {
       String fullName, PTransform<PInput, POutput> transform) {
     return AppliedPTransform.of(fullName, mock(PInput.class), mock(POutput.class), transform);
   }
+
+
+  private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
+    private long fastNanoTime;
+
+    public FastNanoClockAndFuzzySleeper() {
+      fastNanoTime = NanoClock.SYSTEM.nanoTime();
+    }
+
+    @Override
+    public long nanoTime() {
+      return fastNanoTime;
+    }
+
+    @Override
+    public void sleep(long millis) throws InterruptedException {
+      fastNanoTime += millis * 1000000L + ThreadLocalRandom.current().nextInt(500000);
+    }
+  }
 }