You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/07/13 18:29:04 UTC

[1/2] beam git commit: Implement retries in the read connector.

Repository: beam
Updated Branches:
  refs/heads/master 66b4a1be0 -> 889776fca


Implement retries in the read connector.

Respect non-retryable error codes from Datastore.
Add unit tests to check that retryable errors are retried.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/016baf34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/016baf34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/016baf34

Branch: refs/heads/master
Commit: 016baf3465bbccbc9d3df6999b38b1b2533aee8c
Parents: 66b4a1b
Author: Colin Phipps <fi...@google.com>
Authored: Mon Jul 10 16:09:23 2017 +0000
Committer: Colin Phipps <fi...@google.com>
Committed: Thu Jul 13 11:11:21 2017 +0000

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 45 ++++++++++++++++-
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 51 +++++++++++++++++++-
 2 files changed, 94 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/016baf34/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 5f65428..1ed6430 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.datastore.v1.CommitRequest;
 import com.google.datastore.v1.Entity;
 import com.google.datastore.v1.EntityResult;
@@ -65,6 +66,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -238,6 +240,14 @@ public class DatastoreV1 {
   static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000;
 
   /**
+   * Non-retryable errors.
+   * See https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes .
+   */
+  private static final Set<Code> NON_RETRYABLE_ERRORS =
+    ImmutableSet.of(Code.FAILED_PRECONDITION, Code.INVALID_ARGUMENT, Code.PERMISSION_DENIED,
+        Code.UNAUTHENTICATED);
+
+  /**
    * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
    * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
    * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
@@ -840,6 +850,14 @@ public class DatastoreV1 {
       private final V1DatastoreFactory datastoreFactory;
       // Datastore client
       private transient Datastore datastore;
+      private final Counter rpcErrors =
+        Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
+      private final Counter rpcSuccesses =
+        Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
+      private static final int MAX_RETRIES = 5;
+      private static final FluentBackoff RUNQUERY_BACKOFF =
+        FluentBackoff.DEFAULT
+        .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
       public ReadFn(V1Options options) {
         this(options, new V1DatastoreFactory());
@@ -857,6 +875,28 @@ public class DatastoreV1 {
             options.getLocalhost());
       }
 
+      private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception {
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = RUNQUERY_BACKOFF.backoff();
+        while (true) {
+          try {
+            RunQueryResponse response = datastore.runQuery(request);
+            rpcSuccesses.inc();
+            return response;
+          } catch (DatastoreException exception) {
+            rpcErrors.inc();
+
+            if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
+              throw exception;
+            }
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              LOG.error("Aborting after {} retries.", MAX_RETRIES);
+              throw exception;
+            }
+          }
+        }
+      }
+
       /** Read and output entities for the given query. */
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
@@ -878,7 +918,7 @@ public class DatastoreV1 {
           }
 
           RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
-          RunQueryResponse response = datastore.runQuery(request);
+          RunQueryResponse response = runQueryWithRetries(request);
 
           currentBatch = response.getBatch();
 
@@ -1328,6 +1368,9 @@ public class DatastoreV1 {
               exception.getCode(), exception.getMessage());
           rpcErrors.inc();
 
+          if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
+            throw exception;
+          }
           if (!BackOffUtils.next(sleeper, backoff)) {
             LOG.error("Aborting after {} retries.", MAX_RETRIES);
             throw exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/016baf34/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index a3f5d38..cfba1d6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -51,6 +51,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.CommitResponse;
 import com.google.datastore.v1.Entity;
 import com.google.datastore.v1.EntityResult;
 import com.google.datastore.v1.GqlQuery;
@@ -682,6 +683,29 @@ public class DatastoreV1Test {
     }
   }
 
+  /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
+  @Test
+  public void testDatatoreWriterFnRetriesErrors() throws Exception {
+    List<Mutation> mutations = new ArrayList<>();
+    int numRpcs = 2;
+    for (int i = 0; i < DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
+      mutations.add(
+          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+    }
+
+    CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
+    when(mockDatastore.commit(any(CommitRequest.class))).thenReturn(successfulCommit)
+      .thenThrow(
+          new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
+      .thenReturn(successfulCommit);
+
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+        null, mockDatastoreFactory, new FakeWriteBatcher());
+    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    doFnTester.processBundle(mutations);
+  }
+
   /**
    * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
    * query.
@@ -816,6 +840,31 @@ public class DatastoreV1Test {
     readFnTest(5 * QUERY_BATCH_LIMIT);
   }
 
+  /** Tests that {@link ReadFn} retries after an error. */
+  @Test
+  public void testReadFnRetriesErrors() throws Exception {
+    // An empty query to read entities.
+    Query query = Query.newBuilder().setLimit(
+        Int32Value.newBuilder().setValue(1)).build();
+
+    // Use mockResponseForQuery to generate results.
+    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+        .thenThrow(
+            new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
+        .thenAnswer(new Answer<RunQueryResponse>() {
+          @Override
+          public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
+            Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
+            return mockResponseForQuery(q);
+          }
+        });
+
+    ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
+    DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<Entity> entities = doFnTester.processBundle(query);
+  }
+
   @Test
   public void testTranslateGqlQueryWithLimit() throws Exception {
     String gql = "SELECT * from DummyKind LIMIT 10";
@@ -1096,7 +1145,7 @@ public class DatastoreV1Test {
     }
     @Override
     public int nextBatchSize(long timeSinceEpochMillis) {
-      return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
+      return DATASTORE_BATCH_UPDATE_ENTITIES_START;
     }
   }
 }


[2/2] beam git commit: This closes #3558

Posted by ch...@apache.org.
This closes #3558


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/889776fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/889776fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/889776fc

Branch: refs/heads/master
Commit: 889776fcad72e45a93ce4e206ee728595361b1cb
Parents: 66b4a1b 016baf3
Author: Chamikara Jayalath <ch...@google.com>
Authored: Thu Jul 13 11:28:28 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Jul 13 11:28:28 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 45 ++++++++++++++++-
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 51 +++++++++++++++++++-
 2 files changed, 94 insertions(+), 2 deletions(-)
----------------------------------------------------------------------