You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:53 UTC

[22/50] [abbrv] incubator-beam git commit: FluentBackoff: a replacement for a variety of custom backoff implementations

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 6aff3b0..8b5e8c2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -53,14 +53,13 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,14 +71,14 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
 
-  // The maximum number of attempts to execute a BigQuery RPC.
-  private static final int MAX_RPC_ATTEMPTS = 10;
+  // The maximum number of retries to execute a BigQuery RPC.
+  private static final int MAX_RPC_RETRIES = 9;
 
   // The initial backoff for executing a BigQuery RPC.
-  private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+  private static final Duration INITIAL_RPC_BACKOFF = Duration.standardSeconds(1);
 
   // The initial backoff for polling the status of a BigQuery job.
-  private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+  private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1);
 
   @Override
   public JobService getJobService(BigQueryOptions options) {
@@ -121,9 +120,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void startLoadJob(
@@ -139,9 +138,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
@@ -157,9 +156,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
@@ -175,9 +174,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
@@ -194,7 +193,8 @@ class BigQueryServicesImpl implements BigQueryServices {
       ApiErrorExtractor errorExtractor,
       Bigquery client) throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
     }
 
@@ -227,15 +227,17 @@ class BigQueryServicesImpl implements BigQueryServices {
       throw new IOException(
           String.format(
               "Unable to insert job: %s, aborting after %d .",
-              jobRef.getJobId(), MAX_RPC_ATTEMPTS),
+              jobRef.getJobId(), MAX_RPC_RETRIES),
           lastException);
     }
 
     @Override
     public Job pollJob(JobReference jobRef, int maxAttempts)
         throws InterruptedException {
-      BackOff backoff = new AttemptBoundedExponentialBackOff(
-          maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
+      BackOff backoff =
+          FluentBackoff.DEFAULT
+              .withMaxRetries(maxAttempts).withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)
+              .backoff();
       return pollJob(jobRef, Sleeper.DEFAULT, backoff);
     }
 
@@ -270,12 +272,13 @@ class BigQueryServicesImpl implements BigQueryServices {
                   .setQuery(query))
               .setDryRun(true));
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       return executeWithRetries(
           client.jobs().insert(projectId, job),
           String.format(
               "Unable to dry run query: %s, aborting after %d retries.",
-              query, MAX_RPC_ATTEMPTS),
+              query, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff).getStatistics();
     }
@@ -289,15 +292,14 @@ class BigQueryServicesImpl implements BigQueryServices {
     // The maximum number of rows to upload per InsertAll request.
     private static final long MAX_ROWS_PER_BATCH = 500;
 
-    // The maximum number of times to retry inserting rows into BigQuery.
-    private static final int MAX_INSERT_ATTEMPTS = 5;
-
-    // The initial backoff after a failure inserting rows into BigQuery.
-    private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
+    private static final FluentBackoff INSERT_BACKOFF_FACTORY =
+        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
 
-    // Backoff time bounds for rate limit exceeded errors.
-    private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
-    private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+    // A backoff for rate limit exceeded errors. Retries forever.
+    private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.standardSeconds(1))
+            .withMaxBackoff(Duration.standardMinutes(2));
 
     private final ApiErrorExtractor errorExtractor;
     private final Bigquery client;
@@ -335,20 +337,21 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public Table getTable(String projectId, String datasetId, String tableId)
         throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       return executeWithRetries(
           client.tables().get(projectId, datasetId, tableId),
           String.format(
               "Unable to get table: %s, aborting after %d retries.",
-              tableId, MAX_RPC_ATTEMPTS),
+              tableId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff);
     }
@@ -356,20 +359,21 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void deleteTable(String projectId, String datasetId, String tableId)
         throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       executeWithRetries(
           client.tables().delete(projectId, datasetId, tableId),
           String.format(
               "Unable to delete table: %s, aborting after %d retries.",
-              tableId, MAX_RPC_ATTEMPTS),
+              tableId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff);
     }
@@ -378,12 +382,13 @@ class BigQueryServicesImpl implements BigQueryServices {
     public boolean isTableEmpty(String projectId, String datasetId, String tableId)
         throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       TableDataList dataList = executeWithRetries(
           client.tabledata().list(projectId, datasetId, tableId),
           String.format(
               "Unable to list table data: %s, aborting after %d retries.",
-              tableId, MAX_RPC_ATTEMPTS),
+              tableId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff);
       return dataList.getRows() == null || dataList.getRows().isEmpty();
@@ -392,20 +397,21 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public Dataset getDataset(String projectId, String datasetId)
         throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       return executeWithRetries(
           client.datasets().get(projectId, datasetId),
           String.format(
               "Unable to get dataset: %s, aborting after %d retries.",
-              datasetId, MAX_RPC_ATTEMPTS),
+              datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff);
     }
@@ -413,21 +419,21 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void createDataset(
         String projectId, String datasetId, String location, String description)
         throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
     }
 
-    @VisibleForTesting
-    void createDataset(
+    private void createDataset(
         String projectId,
         String datasetId,
         String location,
@@ -464,27 +470,28 @@ class BigQueryServicesImpl implements BigQueryServices {
       throw new IOException(
           String.format(
               "Unable to create dataset: %s, aborting after %d .",
-              datasetId, MAX_RPC_ATTEMPTS),
+              datasetId, MAX_RPC_RETRIES),
           lastException);
     }
 
     /**
      * {@inheritDoc}
      *
-     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
      *
-     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
     public void deleteDataset(String projectId, String datasetId)
         throws IOException, InterruptedException {
       BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       executeWithRetries(
           client.datasets().delete(projectId, datasetId),
           String.format(
               "Unable to delete table: %s, aborting after %d retries.",
-              datasetId, MAX_RPC_ATTEMPTS),
+              datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff);
     }
@@ -502,9 +509,7 @@ class BigQueryServicesImpl implements BigQueryServices {
             + "as many elements as rowList");
       }
 
-      AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-          MAX_INSERT_ATTEMPTS,
-          INITIAL_INSERT_BACKOFF_INTERVAL_MS);
+      BackOff backoff = INSERT_BACKOFF_FACTORY.backoff();
 
       long retTotalDataSize = 0;
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
@@ -547,8 +552,7 @@ class BigQueryServicesImpl implements BigQueryServices {
                 executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
                   @Override
                   public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
-                    BackOff backoff = new IntervalBoundedExponentialBackOff(
-                        MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+                    BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff();
                     while (true) {
                       try {
                         return insert.execute().getInsertErrors();
@@ -603,21 +607,24 @@ class BigQueryServicesImpl implements BigQueryServices {
           throw new RuntimeException(e.getCause());
         }
 
-        if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
-          try {
-            Thread.sleep(backoff.nextBackOffMillis());
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException(
-                "Interrupted while waiting before retrying insert of " + retryRows);
-          }
-          LOG.info("Retrying failed inserts to BigQuery");
-          rowsToPublish = retryRows;
-          idsToPublish = retryIds;
-          allErrors.clear();
-        } else {
+        if (allErrors.isEmpty()) {
+          break;
+        }
+        long nextBackoffMillis = backoff.nextBackOffMillis();
+        if (nextBackoffMillis == BackOff.STOP) {
           break;
         }
+        try {
+          Thread.sleep(backoff.nextBackOffMillis());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException(
+              "Interrupted while waiting before retrying insert of " + retryRows);
+        }
+        LOG.info("Retrying failed inserts to BigQuery");
+        rowsToPublish = retryRows;
+        idsToPublish = retryIds;
+        allErrors.clear();
       }
       if (!allErrors.isEmpty()) {
         throw new IOException("Insert failed: " + allErrors);
@@ -628,7 +635,7 @@ class BigQueryServicesImpl implements BigQueryServices {
   }
 
   private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
-    BigQueryTableRowIterator iterator;
+    private BigQueryTableRowIterator iterator;
 
     private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
       this.iterator = iterator;
@@ -706,7 +713,6 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   /**
    * Identical to {@link BackOffUtils#next} but without checked IOException.
-   * @throws InterruptedException
    */
   private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 729da97..677c661 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -58,7 +58,7 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -426,7 +426,8 @@ class BigQueryTableRowIterator implements AutoCloseable {
       Object... errorArgs) throws IOException, InterruptedException {
     Sleeper sleeper = Sleeper.DEFAULT;
     BackOff backOff =
-        new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
+        FluentBackoff.DEFAULT
+            .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff();
 
     T result = null;
     while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 6bd03b5..45871f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -76,12 +76,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -862,16 +863,11 @@ public class DatastoreV1 {
     private final V1DatastoreFactory datastoreFactory;
     // Current batch of mutations to be written.
     private final List<Mutation> mutations = new ArrayList<>();
-    /**
-     * Since a bundle is written in batches, we should retry the commit of a batch in order to
-     * prevent transient errors from causing the bundle to fail.
-     */
-    private static final int MAX_RETRIES = 5;
 
-    /**
-     * Initial backoff time for exponential backoff for retry attempts.
-     */
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
+    private static final int MAX_RETRIES = 5;
+    private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
     DatastoreWriterFn(String projectId) {
       this(projectId, new V1DatastoreFactory());
@@ -906,10 +902,10 @@ public class DatastoreV1 {
     /**
      * Writes a batch of mutations to Cloud Datastore.
      *
-     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
-     * times). All mutations in the batch will be committed again, even if the commit was partially
-     * successful. If the retry limit is exceeded, the last exception from the Cloud Datastore will
-     * be thrown.
+     * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All
+     * mutations in the batch will be committed again, even if the commit was partially
+     * successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be
+     * thrown.
      *
      * @throws DatastoreException if the commit fails or IOException or InterruptedException if
      * backing off between retries fails.
@@ -917,7 +913,7 @@ public class DatastoreV1 {
     private void flushBatch() throws DatastoreException, IOException, InterruptedException {
       LOG.debug("Writing batch of {} mutations", mutations.size());
       Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+      BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
 
       while (true) {
         // Batch upsert entities.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index eb5fbe6..16cb004 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -56,7 +56,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
 import org.junit.Before;
@@ -117,9 +117,8 @@ public class BigQueryServicesImplTest {
     when(response.getContent()).thenReturn(toStream(testJob));
 
     Sleeper sleeper = new FastNanoClockAndSleeper();
-    BackOff backoff = new AttemptBoundedExponentialBackOff(
-        5 /* attempts */, 1000 /* initialIntervalMillis */);
-    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+    JobServiceImpl.startJob(
+        testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
 
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
@@ -141,9 +140,8 @@ public class BigQueryServicesImplTest {
     when(response.getStatusCode()).thenReturn(409); // 409 means already exists
 
     Sleeper sleeper = new FastNanoClockAndSleeper();
-    BackOff backoff = new AttemptBoundedExponentialBackOff(
-        5 /* attempts */, 1000 /* initialIntervalMillis */);
-    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+    JobServiceImpl.startJob(
+        testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
 
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
@@ -169,9 +167,8 @@ public class BigQueryServicesImplTest {
         .thenReturn(toStream(testJob));
 
     Sleeper sleeper = new FastNanoClockAndSleeper();
-    BackOff backoff = new AttemptBoundedExponentialBackOff(
-        5 /* attempts */, 1000 /* initialIntervalMillis */);
-    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+    JobServiceImpl.startJob(
+        testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
 
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index a596bb3..b680a0e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -54,8 +54,9 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -232,7 +233,7 @@ class V1TestUtil {
     // Number of times to retry on update failure
     private static final int MAX_RETRIES = 5;
     //Initial backoff time for exponential backoff for retry attempts.
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
+    private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(5);
 
     // Returns true if a Datastore key is complete. A key is complete if its last element
     // has either an id or a name.
@@ -279,7 +280,9 @@ class V1TestUtil {
     private void flushBatch() throws DatastoreException, IOException, InterruptedException {
       LOG.info("Writing batch of {} entities", entities.size());
       Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+      BackOff backoff =
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF).backoff();
 
       while (true) {
         // Batch mutate entities.