You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/09 18:24:54 UTC

[beam] branch master updated: Merge pull request #17577 from [BEAM-14435] Adding exception handling tests for SpannerIO write transform

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b212f1c1965 Merge pull request #17577 from [BEAM-14435] Adding exception handling tests for SpannerIO write transform
b212f1c1965 is described below

commit b212f1c196525cd46ea2022e0775d117e1a17612
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon May 9 14:24:48 2022 -0400

    Merge pull request #17577 from [BEAM-14435] Adding exception handling tests for SpannerIO write transform
    
    * Adding exception handling tests for SpannerIO write transform
    
    * fixup
---
 .../SpannerIOWriteExceptionHandlingTest.java       | 225 +++++++++++++++++++++
 .../sdk/io/gcp/spanner/SpannerIOWriteTest.java     |  32 +--
 2 files changed, 242 insertions(+), 15 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java
new file mode 100644
index 00000000000..8c564ff0708
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ReadOnlyTransaction;
+import com.google.cloud.spanner.SpannerExceptionFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(Parameterized.class)
+public class SpannerIOWriteExceptionHandlingTest {
+
+  private static final long CELLS_PER_KEY = 7;
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  private FakeServiceFactory serviceFactory;
+
+  @Captor public transient ArgumentCaptor<Iterable<Mutation>> mutationBatchesCaptor;
+  @Captor public transient ArgumentCaptor<Options.ReadQueryUpdateTransactionOption> optionsCaptor;
+
+  // Using
+  // https://cloud.google.com/java/docs/reference/google-cloud-spanner/latest/com.google.cloud.spanner.ErrorCode
+  // to select test cases and make sure that we're dealing with them appropriately.
+  // The main goal of these tests is to make sure that no exception is ever swallowed.
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+        new Object[][] {
+          // DEADLINE_EXCEEDED is the only exception type that generates retries in-SDK
+          // The default backoff generates 9 retries, and then errors out the pipeline.
+          {ErrorCode.DEADLINE_EXCEEDED, "deadline passed!", 9, 10},
+
+          // All other error codes do not generate in-SDK retries, and the errors are thrown out.
+          {ErrorCode.ABORTED, "transaction aborted!", 0, 1},
+          {ErrorCode.PERMISSION_DENIED, "permission denied, buddy!", 0, 1},
+          {ErrorCode.INTERNAL, "internal error. idk!", 0, 1},
+          {ErrorCode.RESOURCE_EXHAUSTED, "resource exhausted very tired!", 0, 1},
+          {ErrorCode.UNAUTHENTICATED, "authenticate!", 0, 1},
+          {ErrorCode.NOT_FOUND, "not found the thing", 0, 1},
+          {ErrorCode.FAILED_PRECONDITION, "conditions prestart are failed", 0, 1},
+        });
+  }
+
+  private final ErrorCode exceptionErrorcode;
+  private final String errorString;
+  private final Integer callsToSleeper;
+  private final Integer callsToWrite;
+
+  public SpannerIOWriteExceptionHandlingTest(
+      ErrorCode exceptionErrorcode,
+      String errorString,
+      Integer callsToSleeper,
+      Integer callsToWrite) {
+    this.exceptionErrorcode = exceptionErrorcode;
+    this.errorString = errorString;
+    this.callsToSleeper = callsToSleeper;
+    this.callsToWrite = callsToWrite;
+  }
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    serviceFactory = new FakeServiceFactory();
+
+    ReadOnlyTransaction tx = mock(ReadOnlyTransaction.class);
+    when(serviceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn(tx);
+
+    // Capture batches sent to writeAtLeastOnceWithOptions.
+    when(serviceFactory
+            .mockDatabaseClient()
+            .writeAtLeastOnceWithOptions(mutationBatchesCaptor.capture(), optionsCaptor.capture()))
+        .thenReturn(null);
+
+    // Simplest schema: a table with int64 key
+    SpannerIOWriteTest.preparePkMetadata(
+        tx, Arrays.asList(SpannerIOWriteTest.pkMetadata("tEsT", "key", "ASC")));
+    SpannerIOWriteTest.prepareColumnMetadata(
+        tx,
+        Arrays.asList(SpannerIOWriteTest.columnMetadata("tEsT", "key", "INT64", CELLS_PER_KEY)));
+    SpannerIOWriteTest.preparePgColumnMetadata(
+        tx,
+        Arrays.asList(SpannerIOWriteTest.columnMetadata("tEsT", "key", "bigint", CELLS_PER_KEY)));
+
+    // Setup the ProcessWideContainer for testing metrics are set.
+    MetricsContainerImpl container = new MetricsContainerImpl(null);
+    MetricsEnvironment.setProcessWideContainer(container);
+  }
+
+  @Test
+  public void testExceptionHandlingForSimpleWrite() throws InterruptedException {
+    List<Mutation> mutationList = Arrays.asList(SpannerIOWriteTest.m((long) 1));
+
+    // mock sleeper so that it does not actually sleep.
+    SpannerIO.WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
+
+    when(serviceFactory
+            .mockDatabaseClient()
+            .writeAtLeastOnceWithOptions(
+                any(), any(Options.ReadQueryUpdateTransactionOption.class)))
+        .thenThrow(SpannerExceptionFactory.newSpannerException(exceptionErrorcode, errorString));
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage(errorString);
+
+    SpannerWriteResult result =
+        pipeline
+            .apply(Create.of(mutationList))
+            .apply(
+                SpannerIO.write()
+                    .withProjectId("test-project")
+                    .withInstanceId("test-instance")
+                    .withDatabaseId("test-database")
+                    .withServiceFactory(serviceFactory)
+                    .withBatchSizeBytes(0)
+                    .withFailureMode(SpannerIO.FailureMode.FAIL_FAST));
+
+    // One error
+    PAssert.that(result.getFailedMutations())
+        .satisfies(
+            m -> {
+              assertEquals(1, Iterables.size(m));
+              return null;
+            });
+    try {
+      pipeline.run().waitUntilFinish();
+    } finally {
+      verify(SpannerIO.WriteToSpannerFn.sleeper, times(callsToSleeper)).sleep(anyLong());
+      verify(serviceFactory.mockDatabaseClient(), times(callsToWrite))
+          .writeAtLeastOnceWithOptions(any(), any(Options.ReadQueryUpdateTransactionOption.class));
+    }
+  }
+
+  @Test
+  public void testExceptionHandlingForWriteGrouped() throws InterruptedException {
+    List<MutationGroup> mutationList =
+        Arrays.asList(SpannerIOWriteTest.g(SpannerIOWriteTest.m((long) 1)));
+
+    // mock sleeper so that it does not actually sleep.
+    SpannerIO.WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
+
+    when(serviceFactory
+            .mockDatabaseClient()
+            .writeAtLeastOnceWithOptions(
+                any(), any(Options.ReadQueryUpdateTransactionOption.class)))
+        .thenThrow(SpannerExceptionFactory.newSpannerException(exceptionErrorcode, errorString));
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage(errorString);
+
+    SpannerWriteResult result =
+        pipeline
+            .apply(Create.of(mutationList))
+            .apply(
+                SpannerIO.write()
+                    .withProjectId("test-project")
+                    .withInstanceId("test-instance")
+                    .withDatabaseId("test-database")
+                    .withServiceFactory(serviceFactory)
+                    .withBatchSizeBytes(0)
+                    .withFailureMode(SpannerIO.FailureMode.FAIL_FAST)
+                    .grouped());
+
+    // Zero error
+    PAssert.that(result.getFailedMutations())
+        .satisfies(
+            m -> {
+              assertEquals(0, Iterables.size(m));
+              return null;
+            });
+    try {
+      pipeline.run().waitUntilFinish();
+    } finally {
+      verify(SpannerIO.WriteToSpannerFn.sleeper, times(callsToSleeper)).sleep(anyLong());
+      verify(serviceFactory.mockDatabaseClient(), times(callsToWrite))
+          .writeAtLeastOnceWithOptions(any(), any(Options.ReadQueryUpdateTransactionOption.class));
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 32509d2eb87..d0e4e2e13ee 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -152,7 +152,7 @@ public class SpannerIOWriteTest implements Serializable {
         .build();
   }
 
-  private static Struct columnMetadata(
+  static Struct columnMetadata(
       String tableName, String columnName, String type, long cellsMutated) {
     return Struct.newBuilder()
         .set("table_name")
@@ -166,7 +166,7 @@ public class SpannerIOWriteTest implements Serializable {
         .build();
   }
 
-  private static Struct pkMetadata(String tableName, String columnName, String ordering) {
+  static Struct pkMetadata(String tableName, String columnName, String ordering) {
     return Struct.newBuilder()
         .set("table_name")
         .to(tableName)
@@ -177,7 +177,7 @@ public class SpannerIOWriteTest implements Serializable {
         .build();
   }
 
-  private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+  static void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
     Type type =
         Type.struct(
             Type.StructField.of("table_name", Type.string()),
@@ -200,7 +200,7 @@ public class SpannerIOWriteTest implements Serializable {
         .thenReturn(ResultSets.forRows(type, rows));
   }
 
-  private void preparePgColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+  static void preparePgColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
     Type type =
         Type.struct(
             Type.StructField.of("table_name", Type.string()),
@@ -225,7 +225,7 @@ public class SpannerIOWriteTest implements Serializable {
         .thenReturn(ResultSets.forRows(type, rows));
   }
 
-  private void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+  static void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
     Type type =
         Type.struct(
             Type.StructField.of("table_name", Type.string()),
@@ -836,13 +836,15 @@ public class SpannerIOWriteTest implements Serializable {
               assertEquals(1, Iterables.size(m));
               return null;
             });
-    pipeline.run().waitUntilFinish();
-
-    // 0 calls to sleeper
-    verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong());
-    // 5 write attempts for the single mutationGroup.
-    verify(serviceFactory.mockDatabaseClient(), times(5))
-        .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class));
+    try {
+      pipeline.run().waitUntilFinish();
+    } finally {
+      // 0 calls to sleeper
+      verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong());
+      // 5 write attempts for the single mutationGroup.
+      verify(serviceFactory.mockDatabaseClient(), times(5))
+          .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class));
+    }
   }
 
   @Test
@@ -893,8 +895,8 @@ public class SpannerIOWriteTest implements Serializable {
               assertEquals(0, Iterables.size(m));
               return null;
             });
-    pipeline.run().waitUntilFinish();
 
+    pipeline.run().waitUntilFinish();
     // 2 calls to sleeper
     verify(WriteToSpannerFn.sleeper, times(2)).sleep(anyLong());
     // 8 write attempts for the single mutationGroup.
@@ -1466,11 +1468,11 @@ public class SpannerIOWriteTest implements Serializable {
     verify(serviceFactory.mockSpanner(), times(2)).close();
   }
 
-  private static MutationGroup g(Mutation m, Mutation... other) {
+  static MutationGroup g(Mutation m, Mutation... other) {
     return MutationGroup.create(m, other);
   }
 
-  private static Mutation m(Long key) {
+  static Mutation m(Long key) {
     return Mutation.newInsertOrUpdateBuilder("test").set("key").to(key).build();
   }