You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/10 17:28:45 UTC

[beam] branch master updated: [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c06e39b1b [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
     new 21d52f260bb Merge pull request #17593 from [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
f3c06e39b1b is described below

commit f3c06e39b1b61a84470c6e3dd3855d75e5594930
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon May 9 13:11:32 2022 -0400

    [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
---
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 90 +++++++++++++++++++---
 1 file changed, 81 insertions(+), 9 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 34c1b761b07..d7e9ded7eec 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1422,6 +1422,40 @@ public class BigtableIOTest {
     p.run();
   }
 
+  /** Tests that the error happens when submitting the write request is handled. */
+  @Test
+  public void testWritingFailsAtWriteRecord() throws IOException {
+    FailureBigtableService failureService =
+        new FailureBigtableService(FailureOptions.builder().setFailAtWriteRecord(true).build());
+    BigtableIO.Write failureWrite =
+        BigtableIO.write()
+            .withInstanceId("instance")
+            .withProjectId("project")
+            .withBigtableService(failureService);
+
+    final String table = "table";
+    final String key = "key";
+    final String value = "value";
+
+    failureService.createTable(table);
+
+    p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
+        .apply("write", failureWrite.withTableId(table));
+
+    // Exception will be thrown by writer.writeRecord() when BigtableWriterFn is applied.
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Fake IOException in writeRecord()");
+
+    try {
+      p.run();
+    } catch (PipelineExecutionException e) {
+      // throwing inner exception helps assert that first exception is thrown from writeRecord()
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+    }
+  }
+
   /** Tests that when writing an element fails, the write fails. */
   @Test
   public void testWritingFailsBadElement() throws Exception {
@@ -1658,6 +1692,11 @@ public class BigtableIOTest {
       return new FailureBigtableReader(source, this, failureOptions);
     }
 
+    @Override
+    public FailureBigtableWriter openForWriting(String tableId) {
+      return new FailureBigtableWriter(tableId, this, failureOptions);
+    }
+
     @Override
     public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
       if (failureOptions.getFailAtSplit()) {
@@ -1756,6 +1795,7 @@ public class BigtableIOTest {
     }
   }
 
+  /** A {@link FakeBigtableReader} implementation that throw exceptions at given stage. */
   private static class FailureBigtableReader extends FakeBigtableReader {
     public FailureBigtableReader(
         BigtableSource source, FakeBigtableService service, FailureOptions options) {
@@ -1785,6 +1825,7 @@ public class BigtableIOTest {
     private long numAdvance;
     private final FailureOptions failureOptions;
   }
+
   /**
    * A {@link BigtableService.Writer} implementation that writes to the static instance of {@link
    * FakeBigtableService} stored in {@link #service}.
@@ -1797,14 +1838,20 @@ public class BigtableIOTest {
    */
   private static class FakeBigtableWriter implements BigtableService.Writer {
     private final String tableId;
+    private final FakeBigtableService service;
 
-    public FakeBigtableWriter(String tableId) {
+    public FakeBigtableWriter(String tableId, FakeBigtableService service) {
       this.tableId = tableId;
+      this.service = service;
+    }
+
+    public FakeBigtableWriter(String tableId) {
+      this(tableId, BigtableIOTest.service);
     }
 
     @Override
-    public CompletionStage<MutateRowResponse> writeRecord(
-        KV<ByteString, Iterable<Mutation>> record) {
+    public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+        throws IOException {
       service.verifyTableExists(tableId);
       Map<ByteString, ByteString> table = service.getTable(tableId);
       ByteString key = record.getKey();
@@ -1827,6 +1874,26 @@ public class BigtableIOTest {
     public void close() {}
   }
 
+  /** A {@link FakeBigtableWriter} implementation that throw exceptions at given stage. */
+  private static class FailureBigtableWriter extends FakeBigtableWriter {
+    public FailureBigtableWriter(
+        String tableId, FailureBigtableService service, FailureOptions options) {
+      super(tableId, service);
+      this.failureOptions = options;
+    }
+
+    @Override
+    public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+        throws IOException {
+      if (failureOptions.getFailAtWriteRecord()) {
+        throw new IOException("Fake IOException in writeRecord()");
+      }
+      return super.writeRecord(record);
+    }
+
+    private final FailureOptions failureOptions;
+  }
+
   /** A serializable comparator for ByteString. Used to make row samples. */
   private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
     @Override
@@ -1837,29 +1904,34 @@ public class BigtableIOTest {
 
   /** Error injection options for FakeBigtableService and FakeBigtableReader. */
   @AutoValue
-  abstract static class FailureOptions {
+  abstract static class FailureOptions implements Serializable {
     abstract Boolean getFailAtStart();
 
     abstract Boolean getFailAtAdvance();
 
     abstract Boolean getFailAtSplit();
 
+    abstract Boolean getFailAtWriteRecord();
+
     static Builder builder() {
       return new AutoValue_BigtableIOTest_FailureOptions.Builder()
           .setFailAtStart(false)
           .setFailAtAdvance(false)
-          .setFailAtSplit(false);
+          .setFailAtSplit(false)
+          .setFailAtWriteRecord(false);
     }
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract BigtableIOTest.FailureOptions.Builder setFailAtStart(Boolean failAtStart);
+      abstract Builder setFailAtStart(Boolean failAtStart);
+
+      abstract Builder setFailAtAdvance(Boolean failAtAdvance);
 
-      abstract BigtableIOTest.FailureOptions.Builder setFailAtAdvance(Boolean failAtAdvance);
+      abstract Builder setFailAtSplit(Boolean failAtSplit);
 
-      abstract BigtableIOTest.FailureOptions.Builder setFailAtSplit(Boolean failAtSplit);
+      abstract Builder setFailAtWriteRecord(Boolean failAtWriteRecord);
 
-      abstract BigtableIOTest.FailureOptions build();
+      abstract FailureOptions build();
     }
   }
 }