You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/07/13 18:29:04 UTC
[1/2] beam git commit: Implement retries in the read connector.
Repository: beam
Updated Branches:
refs/heads/master 66b4a1be0 -> 889776fca
Implement retries in the read connector.
Respect non-retryable error codes from Datastore.
Add unit tests to check that retryable errors are retried.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/016baf34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/016baf34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/016baf34
Branch: refs/heads/master
Commit: 016baf3465bbccbc9d3df6999b38b1b2533aee8c
Parents: 66b4a1b
Author: Colin Phipps <fi...@google.com>
Authored: Mon Jul 10 16:09:23 2017 +0000
Committer: Colin Phipps <fi...@google.com>
Committed: Thu Jul 13 11:11:21 2017 +0000
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 45 ++++++++++++++++-
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 51 +++++++++++++++++++-
2 files changed, 94 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/016baf34/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 5f65428..1ed6430 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
@@ -65,6 +66,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
@@ -238,6 +240,14 @@ public class DatastoreV1 {
static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000;
/**
+ * Non-retryable errors.
+ * See https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes .
+ */
+ private static final Set<Code> NON_RETRYABLE_ERRORS =
+ ImmutableSet.of(Code.FAILED_PRECONDITION, Code.INVALID_ARGUMENT, Code.PERMISSION_DENIED,
+ Code.UNAUTHENTICATED);
+
+ /**
* Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
* {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
* {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
@@ -840,6 +850,14 @@ public class DatastoreV1 {
private final V1DatastoreFactory datastoreFactory;
// Datastore client
private transient Datastore datastore;
+ private final Counter rpcErrors =
+ Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
+ private final Counter rpcSuccesses =
+ Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
+ private static final int MAX_RETRIES = 5;
+ private static final FluentBackoff RUNQUERY_BACKOFF =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
public ReadFn(V1Options options) {
this(options, new V1DatastoreFactory());
@@ -857,6 +875,28 @@ public class DatastoreV1 {
options.getLocalhost());
}
+ private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = RUNQUERY_BACKOFF.backoff();
+ while (true) {
+ try {
+ RunQueryResponse response = datastore.runQuery(request);
+ rpcSuccesses.inc();
+ return response;
+ } catch (DatastoreException exception) {
+ rpcErrors.inc();
+
+ if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
+ throw exception;
+ }
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
+ }
+ }
+ }
+ }
+
/** Read and output entities for the given query. */
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
@@ -878,7 +918,7 @@ public class DatastoreV1 {
}
RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
- RunQueryResponse response = datastore.runQuery(request);
+ RunQueryResponse response = runQueryWithRetries(request);
currentBatch = response.getBatch();
@@ -1328,6 +1368,9 @@ public class DatastoreV1 {
exception.getCode(), exception.getMessage());
rpcErrors.inc();
+ if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
+ throw exception;
+ }
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error("Aborting after {} retries.", MAX_RETRIES);
throw exception;
http://git-wip-us.apache.org/repos/asf/beam/blob/016baf34/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index a3f5d38..cfba1d6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -51,6 +51,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.CommitResponse;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.GqlQuery;
@@ -682,6 +683,29 @@ public class DatastoreV1Test {
}
}
+ /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
+ @Test
+ public void testDatatoreWriterFnRetriesErrors() throws Exception {
+ List<Mutation> mutations = new ArrayList<>();
+ int numRpcs = 2;
+ for (int i = 0; i < DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
+ mutations.add(
+ makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+ }
+
+ CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
+ when(mockDatastore.commit(any(CommitRequest.class))).thenReturn(successfulCommit)
+ .thenThrow(
+ new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
+ .thenReturn(successfulCommit);
+
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+ null, mockDatastoreFactory, new FakeWriteBatcher());
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+ }
+
/**
* Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
* query.
@@ -816,6 +840,31 @@ public class DatastoreV1Test {
readFnTest(5 * QUERY_BATCH_LIMIT);
}
+ /** Tests that {@link ReadFn} retries after an error. */
+ @Test
+ public void testReadFnRetriesErrors() throws Exception {
+ // An empty query to read entities.
+ Query query = Query.newBuilder().setLimit(
+ Int32Value.newBuilder().setValue(1)).build();
+
+ // Use mockResponseForQuery to generate results.
+ when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+ .thenThrow(
+ new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
+ .thenAnswer(new Answer<RunQueryResponse>() {
+ @Override
+ public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
+ return mockResponseForQuery(q);
+ }
+ });
+
+ ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
+ DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<Entity> entities = doFnTester.processBundle(query);
+ }
+
@Test
public void testTranslateGqlQueryWithLimit() throws Exception {
String gql = "SELECT * from DummyKind LIMIT 10";
@@ -1096,7 +1145,7 @@ public class DatastoreV1Test {
}
@Override
public int nextBatchSize(long timeSinceEpochMillis) {
- return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
+ return DATASTORE_BATCH_UPDATE_ENTITIES_START;
}
}
}
[2/2] beam git commit: This closes #3558
Posted by ch...@apache.org.
This closes #3558
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/889776fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/889776fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/889776fc
Branch: refs/heads/master
Commit: 889776fcad72e45a93ce4e206ee728595361b1cb
Parents: 66b4a1b 016baf3
Author: Chamikara Jayalath <ch...@google.com>
Authored: Thu Jul 13 11:28:28 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Jul 13 11:28:28 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 45 ++++++++++++++++-
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 51 +++++++++++++++++++-
2 files changed, 94 insertions(+), 2 deletions(-)
----------------------------------------------------------------------