You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:54 UTC
[23/50] [abbrv] incubator-beam git commit: FluentBackoff: a
replacement for a variety of custom backoff implementations
FluentBackoff: a replacement for a variety of custom backoff implementations
We have 3 different backoff classes, which don't really have that much
different functionality. Add a single, flexible backoff implementation
that can be used to replace all three classes. Additionally, this new
backoff actually supports more functionality than any of the other three
did -- you can limit retries, cap the exponential growth of an
individual backoff, and cap the cumulative time spent in backoff; prior
implementations did not allow all 3.
This also makes the parameters self-obvious (Duration, not
number-that-is-also-millis) where appropriate.
This initial PR should have no functional changes.
* Implement FluentBackoff
* Replace other custom BackOff implementations with FluentBackoff
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f485666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f485666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f485666
Branch: refs/heads/gearpump-runner
Commit: 3f48566618552c4b0fa026aa3a75ef6f1875da63
Parents: c92e45d
Author: Dan Halperin <dh...@google.com>
Authored: Wed Aug 24 22:35:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:12 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 7 +-
.../runners/dataflow/DataflowPipelineJob.java | 72 +++---
.../beam/runners/dataflow/util/PackageUtil.java | 31 +--
.../dataflow/DataflowPipelineJobTest.java | 32 ++-
.../sdk/io/BoundedReadFromUnboundedSource.java | 8 +-
...AttemptAndTimeBoundedExponentialBackOff.java | 172 --------------
.../util/AttemptBoundedExponentialBackOff.java | 85 -------
.../org/apache/beam/sdk/util/FluentBackoff.java | 229 +++++++++++++++++++
.../java/org/apache/beam/sdk/util/GcsUtil.java | 18 +-
.../util/IntervalBoundedExponentialBackOff.java | 1 +
...mptAndTimeBoundedExponentialBackOffTest.java | 212 -----------------
.../AttemptBoundedExponentialBackOffTest.java | 84 -------
.../apache/beam/sdk/util/FluentBackoffTest.java | 226 ++++++++++++++++++
.../org/apache/beam/sdk/util/GcsUtilTest.java | 10 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 152 ++++++------
.../gcp/bigquery/BigQueryTableRowIterator.java | 5 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 26 +--
.../gcp/bigquery/BigQueryServicesImplTest.java | 17 +-
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 9 +-
20 files changed, 675 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index eadb580..2e8dcf6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -45,8 +45,9 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
/**
* The utility class that sets up and tears down external resources,
@@ -79,7 +80,9 @@ public class ExampleUtils {
*/
public void setup() throws IOException {
Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
+ BackOff backOff =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff();
Throwable lastException = null;
try {
do {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 9a515fa..dad59f2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -34,7 +34,6 @@ import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
@@ -44,8 +43,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,14 +93,27 @@ public class DataflowPipelineJob implements PipelineResult {
/**
* The polling interval for job status and messages information.
*/
- static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
- static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+ static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2);
+ static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2);
+
+ static final double DEFAULT_BACKOFF_EXPONENT = 1.5;
/**
- * The amount of polling attempts for job status and messages information.
+ * The amount of polling retries for job status and messages information.
*/
- static final int MESSAGES_POLLING_ATTEMPTS = 12;
- static final int STATUS_POLLING_ATTEMPTS = 5;
+ static final int MESSAGES_POLLING_RETRIES = 11;
+ static final int STATUS_POLLING_RETRIES = 4;
+
+ private static final FluentBackoff MESSAGES_BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(MESSAGES_POLLING_INTERVAL)
+ .withMaxRetries(MESSAGES_POLLING_RETRIES)
+ .withExponent(DEFAULT_BACKOFF_EXPONENT);
+ protected static final FluentBackoff STATUS_BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(STATUS_POLLING_INTERVAL)
+ .withMaxRetries(STATUS_POLLING_RETRIES)
+ .withExponent(DEFAULT_BACKOFF_EXPONENT);
/**
* Constructs the job.
@@ -214,21 +225,23 @@ public class DataflowPipelineJob implements PipelineResult {
MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
long lastTimestamp = 0;
- BackOff backoff =
- duration.getMillis() > 0
- ? new AttemptAndTimeBoundedExponentialBackOff(
- MESSAGES_POLLING_ATTEMPTS,
- MESSAGES_POLLING_INTERVAL,
- duration.getMillis(),
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
- nanoClock)
- : new AttemptBoundedExponentialBackOff(
- MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
+ BackOff backoff;
+ if (!duration.isLongerThan(Duration.ZERO)) {
+ backoff = MESSAGES_BACKOFF_FACTORY.backoff();
+ } else {
+ backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff();
+ }
+
+ // This function tracks the cumulative time from the *first request* to enforce the wall-clock
+ // limit. Any backoff instance could, at best, track the the time since the first attempt at a
+ // given request. Thus, we need to track the cumulative time ourselves.
+ long startNanos = nanoClock.nanoTime();
+
State state;
do {
// Get the state of the job before listing messages. This ensures we always fetch job
// messages after the job finishes to ensure we have all them.
- state = getStateWithRetries(1, sleeper);
+ state = getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper);
boolean hasError = state == State.UNKNOWN;
if (messageHandler != null && !hasError) {
@@ -250,7 +263,16 @@ public class DataflowPipelineJob implements PipelineResult {
}
if (!hasError) {
+ // Reset the backoff.
backoff.reset();
+ // If duration is set, update the new cumulative sleep time to be the remaining
+ // part of the total input sleep duration.
+ if (duration.isLongerThan(Duration.ZERO)) {
+ long nanosConsumed = nanoClock.nanoTime() - startNanos;
+ Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
+ backoff =
+ MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration.minus(consumed)).backoff();
+ }
// Check if the job is done.
if (state.isTerminal()) {
return state;
@@ -287,7 +309,7 @@ public class DataflowPipelineJob implements PipelineResult {
return terminalState;
}
- return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
+ return getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
}
/**
@@ -299,7 +321,7 @@ public class DataflowPipelineJob implements PipelineResult {
* @return The state of the job or State.UNKNOWN in case of failure.
*/
@VisibleForTesting
- State getStateWithRetries(int attempts, Sleeper sleeper) {
+ State getStateWithRetries(BackOff attempts, Sleeper sleeper) {
if (terminalState != null) {
return terminalState;
}
@@ -318,17 +340,13 @@ public class DataflowPipelineJob implements PipelineResult {
* Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
* maximum number of passed in attempts.
*
- * @param attempts The amount of attempts to make.
+ * @param backoff the {@link BackOff} used to control retries.
* @param sleeper Object used to do the sleeps between attempts.
* @return The underlying {@link Job} object.
* @throws IOException When the maximum number of retries is exhausted, the last exception is
* thrown.
*/
- @VisibleForTesting
- Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
- AttemptBoundedExponentialBackOff backoff =
- new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
-
+ private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOException {
// Retry loop ends in return or throw
while (true) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index bf1f666..6d910ba 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.dataflow.util;
import com.fasterxml.jackson.core.Base64Variants;
-import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
@@ -37,10 +37,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.ZipFiles;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,11 +55,15 @@ public class PackageUtil {
/**
* The initial interval to use between package staging attempts.
*/
- private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
+ private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5);
/**
- * The maximum number of attempts when staging a file.
+ * The maximum number of retries when staging a file.
*/
- private static final int MAX_ATTEMPTS = 5;
+ private static final int MAX_RETRIES = 4;
+
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL);
/**
* Translates exceptions from API calls.
@@ -199,9 +204,7 @@ public class PackageUtil {
}
// Upload file, retrying on failure.
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_ATTEMPTS,
- INITIAL_BACKOFF_INTERVAL_MS);
+ BackOff backoff = BACKOFF_FACTORY.backoff();
while (true) {
try {
LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
@@ -219,15 +222,17 @@ public class PackageUtil {
+ "'gcloud auth login'.", classpathElement, target);
LOG.error(errorMessage);
throw new IOException(errorMessage, e);
- } else if (!backoff.atMaxAttempts()) {
- LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
- classpathElement, e);
- BackOffUtils.next(retrySleeper, backoff);
- } else {
+ }
+ long sleep = backoff.nextBackOffMillis();
+ if (sleep == BackOff.STOP) {
// Rethrow last error, to be included as a cause in the catch below.
LOG.error("Upload failed, will NOT retry staging of classpath: {}",
classpathElement, e);
throw e;
+ } else {
+ LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
+ classpathElement, e);
+ retrySleeper.sleep(sleep);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 22b5400..226140a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.Duration;
@@ -111,22 +110,21 @@ public class DataflowPipelineJobTest {
* AttemptBoundedExponentialBackOff given the number of retries and
* an initial polling interval.
*
- * @param pollingIntervalMillis The initial polling interval given.
- * @param attempts The number of attempts made
+ * @param pollingInterval The initial polling interval given.
+ * @param retries The number of retries made
* @param timeSleptMillis The amount of time slept by the clock. This is checked
* against the valid interval.
*/
- void checkValidInterval(long pollingIntervalMillis, int attempts, long timeSleptMillis) {
+ private void checkValidInterval(Duration pollingInterval, int retries, long timeSleptMillis) {
long highSum = 0;
long lowSum = 0;
- for (int i = 1; i < attempts; i++) {
+ for (int i = 0; i < retries; i++) {
double currentInterval =
- pollingIntervalMillis
- * Math.pow(AttemptBoundedExponentialBackOff.DEFAULT_MULTIPLIER, i - 1);
- double offset =
- AttemptBoundedExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR * currentInterval;
- highSum += Math.round(currentInterval + offset);
- lowSum += Math.round(currentInterval - offset);
+ pollingInterval.getMillis()
+ * Math.pow(DataflowPipelineJob.DEFAULT_BACKOFF_EXPONENT, i);
+ double randomOffset = 0.5 * currentInterval;
+ highSum += Math.round(currentInterval + randomOffset);
+ lowSum += Math.round(currentInterval - randomOffset);
}
assertThat(timeSleptMillis, allOf(greaterThanOrEqualTo(lowSum), lessThanOrEqualTo(highSum)));
}
@@ -228,7 +226,7 @@ public class DataflowPipelineJobTest {
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL,
- DataflowPipelineJob.MESSAGES_POLLING_ATTEMPTS, timeDiff);
+ DataflowPipelineJob.MESSAGES_POLLING_RETRIES, timeDiff);
}
@Test
@@ -246,8 +244,8 @@ public class DataflowPipelineJobTest {
State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
- // Should only sleep for the 4 ms remaining.
- assertEquals(timeDiff, 4L);
+ // Should only have slept for the 4 ms allowed.
+ assertEquals(4L, timeDiff);
}
@Test
@@ -268,7 +266,7 @@ public class DataflowPipelineJobTest {
assertEquals(
State.RUNNING,
- job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock));
+ job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
}
@Test
@@ -286,10 +284,10 @@ public class DataflowPipelineJobTest {
long startTime = fastClock.nanoTime();
assertEquals(
State.UNKNOWN,
- job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock));
+ job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL,
- DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, timeDiff);
+ DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 28d7746..b41c655 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -52,6 +52,10 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T
private final UnboundedSource<T, ?> source;
private final long maxNumRecords;
private final Duration maxReadTime;
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(10))
+ .withMaxBackoff(Duration.standardSeconds(10));
/**
* Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
@@ -241,7 +245,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T
private boolean advanceWithBackoff() throws IOException {
// Try reading from the source with exponential backoff
- BackOff backoff = new IntervalBoundedExponentialBackOff(10000L, 10L);
+ BackOff backoff = BACKOFF_FACTORY.backoff();
long nextSleep = backoff.nextBackOffMillis();
while (nextSleep != BackOff.STOP) {
if (endTime != null && Instant.now().isAfter(endTime)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
deleted file mode 100644
index d8050e0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.NanoClock;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff
- * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff
- * unless the time interval has expired since the object was created. At this point, it will always
- * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts,
- * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the
- * constructor.
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff {
- private long endTimeMillis;
- private long maximumTotalWaitTimeMillis;
- private ResetPolicy resetPolicy;
- private final NanoClock nanoClock;
- // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns. Here, we choose 2^53 ns as
- // a smaller but still huge limit.
- private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53;
-
- /**
- * A ResetPolicy controls the behavior of this BackOff when reset() is called. By default, both
- * the number of attempts and the time bound for the BackOff are reset, but an alternative
- * ResetPolicy may be set to only reset one of these two.
- */
- public static enum ResetPolicy {
- ALL,
- ATTEMPTS,
- TIMER
- }
-
- /**
- * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
- *
- * @param maximumNumberOfAttempts The maximum number of attempts it will make.
- * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
- * @param maximumTotalWaitTimeMillis The maximum total time that this object will
- * allow more attempts in milliseconds.
- */
- public AttemptAndTimeBoundedExponentialBackOff(
- int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) {
- this(
- maximumNumberOfAttempts,
- initialIntervalMillis,
- maximumTotalWaitTimeMillis,
- ResetPolicy.ALL,
- NanoClock.SYSTEM);
- }
-
- /**
- * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
- *
- * @param maximumNumberOfAttempts The maximum number of attempts it will make.
- * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
- * @param maximumTotalWaitTimeMillis The maximum total time that this object will
- * allow more attempts in milliseconds.
- * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
- * to being reset.
- */
- public AttemptAndTimeBoundedExponentialBackOff(
- int maximumNumberOfAttempts,
- long initialIntervalMillis,
- long maximumTotalWaitTimeMillis,
- ResetPolicy resetPolicy) {
- this(
- maximumNumberOfAttempts,
- initialIntervalMillis,
- maximumTotalWaitTimeMillis,
- resetPolicy,
- NanoClock.SYSTEM);
- }
-
- /**
- * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
- *
- * @param maximumNumberOfAttempts The maximum number of attempts it will make.
- * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
- * @param maximumTotalWaitTimeMillis The maximum total time that this object will
- * allow more attempts in milliseconds.
- * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
- * to being reset.
- * @param nanoClock clock used to measure the time that has passed.
- */
- public AttemptAndTimeBoundedExponentialBackOff(
- int maximumNumberOfAttempts,
- long initialIntervalMillis,
- long maximumTotalWaitTimeMillis,
- ResetPolicy resetPolicy,
- NanoClock nanoClock) {
- super(maximumNumberOfAttempts, initialIntervalMillis);
- checkArgument(
- maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero.");
- checkArgument(
- maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS,
- "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds");
- checkArgument(resetPolicy != null, "resetPolicy may not be null");
- checkArgument(nanoClock != null, "nanoClock may not be null");
- this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis;
- this.resetPolicy = resetPolicy;
- this.nanoClock = nanoClock;
- // Set the end time for this BackOff. Note that we cannot simply call reset() here since the
- // resetPolicy may not be set to reset the time bound.
- endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
- }
-
- @Override
- @SuppressFBWarnings(value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR",
- justification = "Explicitly handled in implementation.")
- public void reset() {
- // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are
- // set. In this case, we call the parent class's reset() method and return.
- if (resetPolicy == null) {
- super.reset();
- return;
- }
- // Reset the number of attempts.
- if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) {
- super.reset();
- }
- // Reset the time bound.
- if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) {
- endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
- }
- }
-
- public void setEndtimeMillis(long endTimeMillis) {
- this.endTimeMillis = endTimeMillis;
- }
-
- @Override
- public long nextBackOffMillis() {
- if (atMaxAttempts()) {
- return BackOff.STOP;
- }
- long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis());
- return (backoff > 0 ? backoff : BackOff.STOP);
- }
-
- private long getTimeMillis() {
- return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime());
- }
-
- @Override
- public boolean atMaxAttempts() {
- return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
deleted file mode 100644
index 5707293..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-
-
-/**
- * Implementation of {@link BackOff} that increases the back off period for each retry attempt
- * using a randomization function that grows exponentially.
- *
- * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10.
- * For 10 tries the sequence will be (values in seconds):
- *
- * <pre>
- * retry# retry_interval randomized_interval
- * 1 0.5 [0.25, 0.75]
- * 2 0.75 [0.375, 1.125]
- * 3 1.125 [0.562, 1.687]
- * 4 1.687 [0.8435, 2.53]
- * 5 2.53 [1.265, 3.795]
- * 6 3.795 [1.897, 5.692]
- * 7 5.692 [2.846, 8.538]
- * 8 8.538 [4.269, 12.807]
- * 9 12.807 [6.403, 19.210]
- * 10 {@link BackOff#STOP}
- * </pre>
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptBoundedExponentialBackOff implements BackOff {
- public static final double DEFAULT_MULTIPLIER = 1.5;
- public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
- private final int maximumNumberOfAttempts;
- private final long initialIntervalMillis;
- private int currentAttempt;
-
- public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) {
- checkArgument(maximumNumberOfAttempts > 0,
- "Maximum number of attempts must be greater than zero.");
- checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero.");
- this.maximumNumberOfAttempts = maximumNumberOfAttempts;
- this.initialIntervalMillis = initialIntervalMillis;
- reset();
- }
-
- @Override
- public void reset() {
- currentAttempt = 1;
- }
-
- @Override
- public long nextBackOffMillis() {
- if (currentAttempt >= maximumNumberOfAttempts) {
- return BackOff.STOP;
- }
- double currentIntervalMillis = initialIntervalMillis
- * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1);
- double randomOffset = (Math.random() * 2 - 1)
- * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
- currentAttempt += 1;
- return Math.round(currentIntervalMillis + randomOffset);
- }
-
- public boolean atMaxAttempts() {
- return currentAttempt >= maximumNumberOfAttempts;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
new file mode 100644
index 0000000..479d7a8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+import com.google.common.base.MoreObjects;
+import org.joda.time.Duration;
+
+/**
+ * A fluent builder for {@link BackOff} objects that allows customization of the retry algorithm.
+ *
+ * @see #DEFAULT for the default configuration parameters.
+ */
+public final class FluentBackoff {
+
+ private static final double DEFAULT_EXPONENT = 1.5;
+ private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
+ private static final Duration DEFAULT_MIN_BACKOFF = Duration.standardSeconds(1);
+ private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000);
+ private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+ private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000);
+
+ private final double exponent;
+ private final Duration initialBackoff;
+ private final Duration maxBackoff;
+ private final Duration maxCumulativeBackoff;
+ private final int maxRetries;
+
+ /**
+ * By default the {@link BackOff} created by this builder will use exponential backoff (base
+ * exponent 1.5) with an initial backoff of 1 second. These parameters can be overridden with
+ * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)},
+ * respectively, and the maximum backoff after exponential increase can be capped using {@link
+ * FluentBackoff#withMaxBackoff(Duration)}.
+ *
+ * <p>The default {@link BackOff} does not limit the number of retries. To limit the backoff, the
+ * maximum total number of retries can be set using {@link #withMaxRetries(int)}. The
+ * total time spent in backoff can be time-bounded as well by configuring {@link
+ * #withMaxCumulativeBackoff(Duration)}. If either of these limits are reached, calls
+ * to {@link BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal that no more
+ * retries should continue.
+ */
+ public static final FluentBackoff DEFAULT = new FluentBackoff(
+ DEFAULT_EXPONENT,
+ DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_MAX_CUM_BACKOFF,
+ DEFAULT_MAX_RETRIES);
+
+ /**
+ * Instantiates a {@link BackOff} that will obey the current configuration.
+ *
+ * @see FluentBackoff
+ */
+ public BackOff backoff() {
+ return new BackoffImpl(this);
+ }
+
+ /**
+ * Returns a copy of this {@link FluentBackoff} that instead uses the specified exponent to
+ * control the exponential growth of delay.
+ *
+ * <p>Does not modify this object.
+ *
+ * @see FluentBackoff
+ */
+ public FluentBackoff withExponent(double exponent) {
+ checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent);
+ return new FluentBackoff(
+ exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+ }
+
+ /**
+ * Returns a copy of this {@link FluentBackoff} that instead uses the specified initial backoff
+ * duration.
+ *
+ * <p>Does not modify this object.
+ *
+ * @see FluentBackoff
+ */
+ public FluentBackoff withInitialBackoff(Duration initialBackoff) {
+ checkArgument(
+ initialBackoff.isLongerThan(Duration.ZERO),
+ "initialBackoff %s must be at least 1 millisecond",
+ initialBackoff);
+ return new FluentBackoff(
+ exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+ }
+
+ /**
+ * Returns a copy of this {@link FluentBackoff} that limits the maximum backoff of an individual
+ * attempt to the specified duration.
+ *
+ * <p>Does not modify this object.
+ *
+ * @see FluentBackoff
+ */
+ public FluentBackoff withMaxBackoff(Duration maxBackoff) {
+ checkArgument(
+ maxBackoff.getMillis() > 0,
+ "maxBackoff %s must be at least 1 millisecond",
+ maxBackoff);
+ return new FluentBackoff(
+ exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+ }
+
+ /**
+ * Returns a copy of this {@link FluentBackoff} that limits the total time spent in backoff
+ * returned across all calls to {@link BackOff#nextBackOffMillis()}.
+ *
+ * <p>Does not modify this object.
+ *
+ * @see FluentBackoff
+ */
+ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) {
+ checkArgument(maxCumulativeBackoff.isLongerThan(Duration.ZERO),
+ "maxCumulativeBackoff %s must be at least 1 millisecond", maxCumulativeBackoff);
+ return new FluentBackoff(
+ exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+ }
+
+ /**
+ * Returns a copy of this {@link FluentBackoff} that limits the total number of retries, aka
+ * the total number of calls to {@link BackOff#nextBackOffMillis()} before returning
+ * {@link BackOff#STOP}.
+ *
+ * <p>Does not modify this object.
+ *
+ * @see FluentBackoff
+ */
+ public FluentBackoff withMaxRetries(int maxRetries) {
+ checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries);
+ return new FluentBackoff(
+ exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
+ }
+
+ public String toString() {
+ return MoreObjects.toStringHelper(FluentBackoff.class)
+ .add("exponent", exponent)
+ .add("initialBackoff", initialBackoff)
+ .add("maxBackoff", maxBackoff)
+ .add("maxRetries", maxRetries)
+ .add("maxCumulativeBackoff", maxCumulativeBackoff)
+ .toString();
+ }
+
+ private static class BackoffImpl implements BackOff {
+
+ // Customization of this backoff.
+ private final FluentBackoff backoffConfig;
+ // Current state
+ private Duration currentCumulativeBackoff;
+ private int currentRetry;
+
+ @Override
+ public void reset() {
+ currentRetry = 0;
+ currentCumulativeBackoff = Duration.ZERO;
+ }
+
+ @Override
+ public long nextBackOffMillis() {
+ // Maximum number of retries reached.
+ if (currentRetry >= backoffConfig.maxRetries) {
+ return BackOff.STOP;
+ }
+ // Maximum cumulative backoff reached.
+ if (currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) {
+ return BackOff.STOP;
+ }
+
+ double currentIntervalMillis =
+ Math.min(
+ backoffConfig.initialBackoff.getMillis()
+ * Math.pow(backoffConfig.exponent, currentRetry),
+ backoffConfig.maxBackoff.getMillis());
+ double randomOffset =
+ (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
+ long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset);
+ // Cap to limit on cumulative backoff
+ Duration remainingCumulative =
+ backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff);
+ nextBackoffMillis = Math.min(nextBackoffMillis, remainingCumulative.getMillis());
+
+ // Update state and return backoff.
+ currentCumulativeBackoff = currentCumulativeBackoff.plus(nextBackoffMillis);
+ currentRetry += 1;
+ return nextBackoffMillis;
+ }
+
+ private BackoffImpl(FluentBackoff backoffConfig) {
+ this.backoffConfig = backoffConfig;
+ this.reset();
+ }
+
+ public String toString() {
+ return MoreObjects.toStringHelper(BackoffImpl.class)
+ .add("backoffConfig", backoffConfig)
+ .add("currentRetry", currentRetry)
+ .add("currentCumulativeBackoff", currentCumulativeBackoff)
+ .toString();
+ }
+ }
+
+ private FluentBackoff(
+ double exponent, Duration initialBackoff, Duration maxBackoff, Duration maxCumulativeBackoff,
+ int maxRetries) {
+ this.exponent = exponent;
+ this.initialBackoff = initialBackoff;
+ this.maxBackoff = maxBackoff;
+ this.maxRetries = maxRetries;
+ this.maxCumulativeBackoff = maxCumulativeBackoff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 44a182e..41c372e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -66,6 +66,7 @@ import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +120,9 @@ public class GcsUtil {
*/
private static final int MAX_CONCURRENT_BATCHES = 256;
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
/////////////////////////////////////////////////////////////////////////////
/** Client for the GCS API. */
@@ -177,7 +181,7 @@ public class GcsUtil {
// the request has strong global consistency.
ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getObject),
- new AttemptBoundedExponentialBackOff(3, 200),
+ BACKOFF_FACTORY.backoff(),
RetryDeterminer.SOCKET_ERRORS,
IOException.class);
return ImmutableList.of(gcsPattern);
@@ -216,7 +220,7 @@ public class GcsUtil {
try {
objects = ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(listObject),
- new AttemptBoundedExponentialBackOff(3, 200),
+ BACKOFF_FACTORY.backoff(),
RetryDeterminer.SOCKET_ERRORS,
IOException.class);
} catch (Exception e) {
@@ -257,7 +261,10 @@ public class GcsUtil {
* if the resource does not exist.
*/
public long fileSize(GcsPath path) throws IOException {
- return fileSize(path, new AttemptBoundedExponentialBackOff(4, 200), Sleeper.DEFAULT);
+ return fileSize(
+ path,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
}
/**
@@ -335,7 +342,10 @@ public class GcsUtil {
* be accessible otherwise the permissions exception will be propagated.
*/
public boolean bucketExists(GcsPath path) throws IOException {
- return bucketExists(path, new AttemptBoundedExponentialBackOff(4, 200), Sleeper.DEFAULT);
+ return bucketExists(
+ path,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
index 519776a..6fac6dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
@@ -49,6 +49,7 @@ import com.google.api.client.util.BackOff;
*
* <p>Implementation is not thread-safe.
*/
+@Deprecated
public class IntervalBoundedExponentialBackOff implements BackOff {
public static final double DEFAULT_MULTIPLIER = 1.5;
public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
deleted file mode 100644
index 59e0fb7..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.client.util.BackOff;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link AttemptAndTimeBoundedExponentialBackOff}. */
-@RunWith(JUnit4.class)
-public class AttemptAndTimeBoundedExponentialBackOffTest {
- @Rule public ExpectedException exception = ExpectedException.none();
- @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-
- @Test
- public void testUsingInvalidInitialInterval() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Initial interval must be greater than zero.");
- new AttemptAndTimeBoundedExponentialBackOff(10, 0L, 1000L);
- }
-
- @Test
- public void testUsingInvalidTimeInterval() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Maximum total wait time must be greater than zero.");
- new AttemptAndTimeBoundedExponentialBackOff(10, 2L, 0L);
- }
-
- @Test
- public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Maximum number of attempts must be greater than zero.");
- new AttemptAndTimeBoundedExponentialBackOff(-1, 10L, 1000L);
- }
-
- @Test
- public void testThatFixedNumberOfAttemptsExits() throws Exception {
- BackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3,
- 500L,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testThatResettingAllowsReuse() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3,
- 500,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
-
- backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 30,
- 500,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- fastClock.sleep(2000L);
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- }
-
- @Test
- public void testThatResettingAttemptsAllowsReuse() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3,
- 500,
- 1000,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testThatResettingAttemptsDoesNotAllowsReuse() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 30,
- 500,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- fastClock.sleep(2000L);
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testThatResettingTimerAllowsReuse() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 30,
- 500,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- fastClock.sleep(2000L);
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(561L), lessThan(1688L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(843L), lessThan(2531L)));
- }
-
- @Test
- public void testThatResettingTimerDoesNotAllowReuse() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3,
- 500,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
- fastClock);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testTimeBound() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3, 500L, 5L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
- assertEquals(backOff.nextBackOffMillis(), 5L);
- }
-
- @Test
- public void testAtMaxAttempts() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3,
- 500L,
- 1000L,
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
- fastClock);
- assertFalse(backOff.atMaxAttempts());
- backOff.nextBackOffMillis();
- assertFalse(backOff.atMaxAttempts());
- backOff.nextBackOffMillis();
- assertTrue(backOff.atMaxAttempts());
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testAtMaxTime() throws Exception {
- AttemptBoundedExponentialBackOff backOff =
- new AttemptAndTimeBoundedExponentialBackOff(
- 3, 500L, 1L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
- fastClock.sleep(2);
- assertTrue(backOff.atMaxAttempts());
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
deleted file mode 100644
index 3cfa961..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.client.util.BackOff;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link AttemptBoundedExponentialBackOff}. */
-@RunWith(JUnit4.class)
-public class AttemptBoundedExponentialBackOffTest {
- @Rule public ExpectedException exception = ExpectedException.none();
-
- @Test
- public void testUsingInvalidInitialInterval() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Initial interval must be greater than zero.");
- new AttemptBoundedExponentialBackOff(10, 0L);
- }
-
- @Test
- public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Maximum number of attempts must be greater than zero.");
- new AttemptBoundedExponentialBackOff(-1, 10L);
- }
-
- @Test
- public void testThatFixedNumberOfAttemptsExits() throws Exception {
- BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testThatResettingAllowsReuse() throws Exception {
- BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-
- @Test
- public void testAtMaxAttempts() throws Exception {
- AttemptBoundedExponentialBackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
- assertFalse(backOff.atMaxAttempts());
- backOff.nextBackOffMillis();
- assertFalse(backOff.atMaxAttempts());
- backOff.nextBackOffMillis();
- assertTrue(backOff.atMaxAttempts());
- assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
new file mode 100644
index 0000000..20b03cf
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.client.util.BackOff;
+import java.io.IOException;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FluentBackoff}.
+ */
+@RunWith(JUnit4.class)
+public class FluentBackoffTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+ private final FluentBackoff defaultBackoff = FluentBackoff.DEFAULT;
+
+ @Test
+ public void testInvalidExponent() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("exponent -2.0 must be greater than 0");
+ defaultBackoff.withExponent(-2.0);
+ }
+
+ @Test
+ public void testInvalidInitialBackoff() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("initialBackoff PT0S must be at least 1 millisecond");
+ defaultBackoff.withInitialBackoff(Duration.ZERO);
+ }
+
+ @Test
+ public void testInvalidMaxBackoff() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("maxBackoff PT0S must be at least 1 millisecond");
+ defaultBackoff.withMaxBackoff(Duration.ZERO);
+ }
+
+ @Test
+ public void testInvalidMaxRetries() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("maxRetries -1 cannot be negative");
+ defaultBackoff.withMaxRetries(-1);
+ }
+
+ @Test
+ public void testInvalidCumulativeBackoff() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("maxCumulativeBackoff PT-0.002S must be at least 1 millisecond");
+ defaultBackoff.withMaxCumulativeBackoff(Duration.millis(-2));
+ }
+
+ /**
+ * Tests with bounded interval, custom exponent, and unlimited retries.
+ */
+ @Test
+ public void testBoundedIntervalWithReset() throws Exception {
+ BackOff backOff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(500))
+ .withMaxBackoff(Duration.standardSeconds(1)).backoff();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+
+ // Reset, should go back to short times.
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+ lessThanOrEqualTo(1500L)));
+
+ }
+
+ /**
+ * Tests with bounded interval, custom exponent, limited retries, and a reset.
+ */
+ @Test
+ public void testMaxRetriesWithReset() throws Exception {
+ BackOff backOff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(500))
+ .withMaxRetries(1)
+ .backoff();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+ assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+ assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+ assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
+ }
+
+ private static long countMaximumBackoff(BackOff backOff) throws IOException {
+ long cumulativeBackoffMillis = 0;
+ long currentBackoffMillis = backOff.nextBackOffMillis();
+ while (currentBackoffMillis != BackOff.STOP) {
+ cumulativeBackoffMillis += currentBackoffMillis;
+ currentBackoffMillis = backOff.nextBackOffMillis();
+ }
+ return cumulativeBackoffMillis;
+ }
+
+ /**
+ * Tests with bounded interval, custom exponent, limited cumulative time, and a reset.
+ */
+ @Test
+ public void testBoundedIntervalAndCumTimeWithReset() throws Exception {
+ BackOff backOff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(500))
+ .withMaxBackoff(Duration.standardSeconds(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(1)).backoff();
+
+ assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis()));
+
+ backOff.reset();
+ assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis()));
+ // sanity check: should get 0 if we don't reset
+ assertThat(countMaximumBackoff(backOff), equalTo(0L));
+
+ backOff.reset();
+ assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis()));
+ }
+
+ /**
+ * Tests with bounded interval, custom exponent, limited cumulative time and retries.
+ */
+ @Test
+ public void testBoundedIntervalAndCumTimeAndRetriesWithReset() throws Exception {
+ BackOff backOff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(500))
+ .withMaxBackoff(Duration.standardSeconds(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(1))
+ .backoff();
+
+ long cumulativeBackoffMillis = 0;
+ long currentBackoffMillis = backOff.nextBackOffMillis();
+ while (currentBackoffMillis != BackOff.STOP) {
+ cumulativeBackoffMillis += currentBackoffMillis;
+ currentBackoffMillis = backOff.nextBackOffMillis();
+ }
+ assertThat(cumulativeBackoffMillis, equalTo(Duration.standardMinutes(1).getMillis()));
+ }
+
+ @Test
+ public void testFluentBackoffToString() throws IOException {
+ FluentBackoff config = FluentBackoff.DEFAULT
+ .withExponent(3.4)
+ .withMaxRetries(4)
+ .withInitialBackoff(Duration.standardSeconds(3))
+ .withMaxBackoff(Duration.standardHours(1))
+ .withMaxCumulativeBackoff(Duration.standardDays(1));
+
+ assertEquals(
+ "FluentBackoff{exponent=3.4, initialBackoff=PT3S, maxBackoff=PT3600S,"
+ + " maxRetries=4, maxCumulativeBackoff=PT86400S}",
+ config.toString());
+ }
+ @Test
+ public void testBackoffImplToString() throws IOException {
+ FluentBackoff config = FluentBackoff.DEFAULT
+ .withExponent(3.4)
+ .withMaxRetries(4)
+ .withInitialBackoff(Duration.standardSeconds(3))
+ .withMaxBackoff(Duration.standardHours(1))
+ .withMaxCumulativeBackoff(Duration.standardDays(1));
+ BackOff backOff = config.backoff();
+
+ assertEquals(
+ "BackoffImpl{backoffConfig=" + config.toString() + ","
+ + " currentRetry=0, currentCumulativeBackoff=PT0S}",
+ backOff.toString());
+
+ // backoff once, ignoring result
+ backOff.nextBackOffMillis();
+
+ // currentRetry is exact, we can test it.
+ assertThat(backOff.toString(), containsString("currentRetry=1"));
+ // currentCumulativeBackoff is not exact; we cannot even check that it's non-zero (randomness).
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 681b0aa..9504b4c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -365,7 +365,7 @@ public class GcsUtilTest {
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
- BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+ BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
@@ -376,7 +376,7 @@ public class GcsUtilTest {
assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff, new FastNanoClockAndSleeper()));
- assertEquals(mockBackOff.nextBackOffMillis(), BackOff.STOP);
+ assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
}
@Test
@@ -390,7 +390,7 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -413,7 +413,7 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
GoogleJsonResponseException expectedException =
googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
"Waves hand mysteriously", "These aren't the buckets your looking for");
@@ -438,7 +438,7 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200);
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 304dc82..6dde581 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -24,9 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -69,7 +66,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -108,7 +104,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
import org.apache.beam.sdk.util.GcsUtil;
@@ -129,6 +124,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,7 +290,7 @@ public class BigQueryIO {
*
* <p>If the project id is omitted, the default project id is used.
*/
- public static TableReference parseTableSpec(String tableSpec) {
+ static TableReference parseTableSpec(String tableSpec) {
Matcher match = TABLE_SPEC.matcher(tableSpec);
if (!match.matches()) {
throw new IllegalArgumentException(
@@ -953,14 +949,14 @@ public class BigQueryIO {
* ...
*/
private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
- // The maximum number of attempts to verify temp files.
- private static final int MAX_FILES_VERIFY_ATTEMPTS = 10;
+ // The maximum number of retries to verify temp files.
+ private static final int MAX_FILES_VERIFY_RETRIES = 9;
// The maximum number of retries to poll a BigQuery job.
protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
// The initial backoff for verifying temp files.
- private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+ private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1);
protected final String jobIdToken;
protected final String extractDestinationDir;
@@ -1055,14 +1051,7 @@ public class BigQueryIO {
}};
List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS);
for (String fileName : files) {
- while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
- if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) {
- break;
- }
- }
avroSources.add(new TransformingSource<>(
AvroSource.from(fileName), function, getDefaultOutputCoder()));
}