You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:53 UTC
[22/50] [abbrv] incubator-beam git commit: FluentBackoff: a
replacement for a variety of custom backoff implementations
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 6aff3b0..8b5e8c2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -53,14 +53,13 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,14 +71,14 @@ class BigQueryServicesImpl implements BigQueryServices {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
- // The maximum number of attempts to execute a BigQuery RPC.
- private static final int MAX_RPC_ATTEMPTS = 10;
+ // The maximum number of retries to execute a BigQuery RPC.
+ private static final int MAX_RPC_RETRIES = 9;
// The initial backoff for executing a BigQuery RPC.
- private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+ private static final Duration INITIAL_RPC_BACKOFF = Duration.standardSeconds(1);
// The initial backoff for polling the status of a BigQuery job.
- private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+ private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1);
@Override
public JobService getJobService(BigQueryOptions options) {
@@ -121,9 +120,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void startLoadJob(
@@ -139,9 +138,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
@@ -157,9 +156,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
@@ -175,9 +174,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
@@ -194,7 +193,8 @@ class BigQueryServicesImpl implements BigQueryServices {
ApiErrorExtractor errorExtractor,
Bigquery client) throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
}
@@ -227,15 +227,17 @@ class BigQueryServicesImpl implements BigQueryServices {
throw new IOException(
String.format(
"Unable to insert job: %s, aborting after %d .",
- jobRef.getJobId(), MAX_RPC_ATTEMPTS),
+ jobRef.getJobId(), MAX_RPC_RETRIES),
lastException);
}
@Override
public Job pollJob(JobReference jobRef, int maxAttempts)
throws InterruptedException {
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
+ BackOff backoff =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(maxAttempts).withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)
+ .backoff();
return pollJob(jobRef, Sleeper.DEFAULT, backoff);
}
@@ -270,12 +272,13 @@ class BigQueryServicesImpl implements BigQueryServices {
.setQuery(query))
.setDryRun(true));
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
return executeWithRetries(
client.jobs().insert(projectId, job),
String.format(
"Unable to dry run query: %s, aborting after %d retries.",
- query, MAX_RPC_ATTEMPTS),
+ query, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff).getStatistics();
}
@@ -289,15 +292,14 @@ class BigQueryServicesImpl implements BigQueryServices {
// The maximum number of rows to upload per InsertAll request.
private static final long MAX_ROWS_PER_BATCH = 500;
- // The maximum number of times to retry inserting rows into BigQuery.
- private static final int MAX_INSERT_ATTEMPTS = 5;
-
- // The initial backoff after a failure inserting rows into BigQuery.
- private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
+ private static final FluentBackoff INSERT_BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
- // Backoff time bounds for rate limit exceeded errors.
- private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
- private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+ // A backoff for rate limit exceeded errors. Retries forever.
+ private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.standardSeconds(1))
+ .withMaxBackoff(Duration.standardMinutes(2));
private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
@@ -335,20 +337,21 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public Table getTable(String projectId, String datasetId, String tableId)
throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
return executeWithRetries(
client.tables().get(projectId, datasetId, tableId),
String.format(
"Unable to get table: %s, aborting after %d retries.",
- tableId, MAX_RPC_ATTEMPTS),
+ tableId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff);
}
@@ -356,20 +359,21 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void deleteTable(String projectId, String datasetId, String tableId)
throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
executeWithRetries(
client.tables().delete(projectId, datasetId, tableId),
String.format(
"Unable to delete table: %s, aborting after %d retries.",
- tableId, MAX_RPC_ATTEMPTS),
+ tableId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff);
}
@@ -378,12 +382,13 @@ class BigQueryServicesImpl implements BigQueryServices {
public boolean isTableEmpty(String projectId, String datasetId, String tableId)
throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
TableDataList dataList = executeWithRetries(
client.tabledata().list(projectId, datasetId, tableId),
String.format(
"Unable to list table data: %s, aborting after %d retries.",
- tableId, MAX_RPC_ATTEMPTS),
+ tableId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff);
return dataList.getRows() == null || dataList.getRows().isEmpty();
@@ -392,20 +397,21 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public Dataset getDataset(String projectId, String datasetId)
throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
return executeWithRetries(
client.datasets().get(projectId, datasetId),
String.format(
"Unable to get dataset: %s, aborting after %d retries.",
- datasetId, MAX_RPC_ATTEMPTS),
+ datasetId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff);
}
@@ -413,21 +419,21 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void createDataset(
String projectId, String datasetId, String location, String description)
throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
}
- @VisibleForTesting
- void createDataset(
+ private void createDataset(
String projectId,
String datasetId,
String location,
@@ -464,27 +470,28 @@ class BigQueryServicesImpl implements BigQueryServices {
throw new IOException(
String.format(
"Unable to create dataset: %s, aborting after %d .",
- datasetId, MAX_RPC_ATTEMPTS),
+ datasetId, MAX_RPC_RETRIES),
lastException);
}
/**
* {@inheritDoc}
*
- * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
- * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void deleteDataset(String projectId, String datasetId)
throws IOException, InterruptedException {
BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
executeWithRetries(
client.datasets().delete(projectId, datasetId),
String.format(
"Unable to delete table: %s, aborting after %d retries.",
- datasetId, MAX_RPC_ATTEMPTS),
+ datasetId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff);
}
@@ -502,9 +509,7 @@ class BigQueryServicesImpl implements BigQueryServices {
+ "as many elements as rowList");
}
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_INSERT_ATTEMPTS,
- INITIAL_INSERT_BACKOFF_INTERVAL_MS);
+ BackOff backoff = INSERT_BACKOFF_FACTORY.backoff();
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
@@ -547,8 +552,7 @@ class BigQueryServicesImpl implements BigQueryServices {
executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
@Override
public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
- BackOff backoff = new IntervalBoundedExponentialBackOff(
- MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+ BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff();
while (true) {
try {
return insert.execute().getInsertErrors();
@@ -603,21 +607,24 @@ class BigQueryServicesImpl implements BigQueryServices {
throw new RuntimeException(e.getCause());
}
- if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
- try {
- Thread.sleep(backoff.nextBackOffMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(
- "Interrupted while waiting before retrying insert of " + retryRows);
- }
- LOG.info("Retrying failed inserts to BigQuery");
- rowsToPublish = retryRows;
- idsToPublish = retryIds;
- allErrors.clear();
- } else {
+ if (allErrors.isEmpty()) {
+ break;
+ }
+ long nextBackoffMillis = backoff.nextBackOffMillis();
+ if (nextBackoffMillis == BackOff.STOP) {
break;
}
+ try {
+ Thread.sleep(backoff.nextBackOffMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Interrupted while waiting before retrying insert of " + retryRows);
+ }
+ LOG.info("Retrying failed inserts to BigQuery");
+ rowsToPublish = retryRows;
+ idsToPublish = retryIds;
+ allErrors.clear();
}
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
@@ -628,7 +635,7 @@ class BigQueryServicesImpl implements BigQueryServices {
}
private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
- BigQueryTableRowIterator iterator;
+ private BigQueryTableRowIterator iterator;
private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
this.iterator = iterator;
@@ -706,7 +713,6 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* Identical to {@link BackOffUtils#next} but without checked IOException.
- * @throws InterruptedException
*/
private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 729da97..677c661 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -58,7 +58,7 @@ import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -426,7 +426,8 @@ class BigQueryTableRowIterator implements AutoCloseable {
Object... errorArgs) throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff =
- new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff();
T result = null;
while (true) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 6bd03b5..45871f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -76,12 +76,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -862,16 +863,11 @@ public class DatastoreV1 {
private final V1DatastoreFactory datastoreFactory;
// Current batch of mutations to be written.
private final List<Mutation> mutations = new ArrayList<>();
- /**
- * Since a bundle is written in batches, we should retry the commit of a batch in order to
- * prevent transient errors from causing the bundle to fail.
- */
- private static final int MAX_RETRIES = 5;
- /**
- * Initial backoff time for exponential backoff for retry attempts.
- */
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
+ private static final int MAX_RETRIES = 5;
+ private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
DatastoreWriterFn(String projectId) {
this(projectId, new V1DatastoreFactory());
@@ -906,10 +902,10 @@ public class DatastoreV1 {
/**
* Writes a batch of mutations to Cloud Datastore.
*
- * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
- * times). All mutations in the batch will be committed again, even if the commit was partially
- * successful. If the retry limit is exceeded, the last exception from the Cloud Datastore will
- * be thrown.
+ * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All
+ * mutations in the batch will be committed again, even if the commit was partially
+ * successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be
+ * thrown.
*
* @throws DatastoreException if the commit fails or IOException or InterruptedException if
* backing off between retries fails.
@@ -917,7 +913,7 @@ public class DatastoreV1 {
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
LOG.debug("Writing batch of {} mutations", mutations.size());
Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+ BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
while (true) {
// Batch upsert entities.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index eb5fbe6..16cb004 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -56,7 +56,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
import org.junit.Before;
@@ -117,9 +117,8 @@ public class BigQueryServicesImplTest {
when(response.getContent()).thenReturn(toStream(testJob));
Sleeper sleeper = new FastNanoClockAndSleeper();
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- 5 /* attempts */, 1000 /* initialIntervalMillis */);
- JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+ JobServiceImpl.startJob(
+ testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
@@ -141,9 +140,8 @@ public class BigQueryServicesImplTest {
when(response.getStatusCode()).thenReturn(409); // 409 means already exists
Sleeper sleeper = new FastNanoClockAndSleeper();
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- 5 /* attempts */, 1000 /* initialIntervalMillis */);
- JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+ JobServiceImpl.startJob(
+ testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
@@ -169,9 +167,8 @@ public class BigQueryServicesImplTest {
.thenReturn(toStream(testJob));
Sleeper sleeper = new FastNanoClockAndSleeper();
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- 5 /* attempts */, 1000 /* initialIntervalMillis */);
- JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+ JobServiceImpl.startJob(
+ testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index a596bb3..b680a0e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -54,8 +54,9 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -232,7 +233,7 @@ class V1TestUtil {
// Number of times to retry on update failure
private static final int MAX_RETRIES = 5;
//Initial backoff time for exponential backoff for retry attempts.
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
+ private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(5);
// Returns true if a Datastore key is complete. A key is complete if its last element
// has either an id or a name.
@@ -279,7 +280,9 @@ class V1TestUtil {
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
LOG.info("Writing batch of {} entities", entities.size());
Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+ BackOff backoff =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF).backoff();
while (true) {
// Batch mutate entities.