You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/09 00:56:55 UTC
[2/2] incubator-beam git commit: DataflowPipelineJob: catch an
underflow in backoff code
DataflowPipelineJob: catch an underflow in backoff code
Forward port of https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/422
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ed3b12ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ed3b12ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ed3b12ab
Branch: refs/heads/master
Commit: ed3b12ab764d7867813957f22b67a518d5140ecd
Parents: 7fcc944
Author: Daniel Halperin <dh...@users.noreply.github.com>
Authored: Wed Sep 7 16:57:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Sep 8 17:56:43 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 26 +++++++----
.../dataflow/DataflowPipelineJobTest.java | 46 ++++++++++++++++++++
2 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed3b12ab/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index dad59f2..1af8c98 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -263,19 +263,27 @@ public class DataflowPipelineJob implements PipelineResult {
}
if (!hasError) {
- // Reset the backoff.
+ // We can stop if the job is done.
+ if (state.isTerminal()) {
+ return state;
+ }
+
+ // The job is not done, so we must keep polling.
backoff.reset();
- // If duration is set, update the new cumulative sleep time to be the remaining
- // part of the total input sleep duration.
+
+ // If a total duration for all backoff has been set, update the new cumulative sleep time to
+ // be the remaining total backoff duration, stopping if we have already exceeded the
+ // allotted time.
if (duration.isLongerThan(Duration.ZERO)) {
long nanosConsumed = nanoClock.nanoTime() - startNanos;
Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
- backoff =
- MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration.minus(consumed)).backoff();
- }
- // Check if the job is done.
- if (state.isTerminal()) {
- return state;
+ Duration remaining = duration.minus(consumed);
+ if (remaining.isLongerThan(Duration.ZERO)) {
+ backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff();
+ } else {
+ // If there is no time remaining, don't bother backing off.
+ backoff = BackOff.STOP_BACKOFF;
+ }
}
}
} while(BackOffUtils.next(sleeper, backoff));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed3b12ab/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 226140a..4c70d12 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -32,6 +32,8 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics;
@@ -46,6 +48,7 @@ import com.google.common.collect.ImmutableSetMultimap;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.SocketTimeoutException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
@@ -249,6 +252,30 @@ public class DataflowPipelineJobTest {
}
@Test
+ public void testCumulativeTimeOverflow() throws Exception {
+ Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+ Job statusResponse = new Job();
+ statusResponse.setCurrentState("JOB_STATE_RUNNING");
+ when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+ when(statusRequest.execute()).thenReturn(statusResponse);
+
+ DataflowAggregatorTransforms dataflowAggregatorTransforms =
+ mock(DataflowAggregatorTransforms.class);
+
+ FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
+
+ DataflowPipelineJob job = new DataflowPipelineJob(
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ long startTime = clock.nanoTime();
+ State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
+ assertEquals(null, state);
+ long timeDiff = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - startTime);
+ // Should only have slept for the 4 ms allowed.
+ assertThat(timeDiff, lessThanOrEqualTo(4L));
+ }
+
+ @Test
public void testGetStateReturnsServiceState() throws Exception {
Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
@@ -609,4 +636,23 @@ public class DataflowPipelineJobTest {
String fullName, PTransform<PInput, POutput> transform) {
return AppliedPTransform.of(fullName, mock(PInput.class), mock(POutput.class), transform);
}
+
+
+ private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
+ private long fastNanoTime;
+
+ public FastNanoClockAndFuzzySleeper() {
+ fastNanoTime = NanoClock.SYSTEM.nanoTime();
+ }
+
+ @Override
+ public long nanoTime() {
+ return fastNanoTime;
+ }
+
+ @Override
+ public void sleep(long millis) throws InterruptedException {
+ fastNanoTime += millis * 1000000L + ThreadLocalRandom.current().nextInt(500000);
+ }
+ }
}