You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/10 17:28:45 UTC
[beam] branch master updated: [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f3c06e39b1b [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
new 21d52f260bb Merge pull request #17593 from [BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
f3c06e39b1b is described below
commit f3c06e39b1b61a84470c6e3dd3855d75e5594930
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon May 9 13:11:32 2022 -0400
[BEAM-14423] Add test cases for BigtableIO.BigtableWriterFn fails due to writeRecord
---
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 90 +++++++++++++++++++---
1 file changed, 81 insertions(+), 9 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 34c1b761b07..d7e9ded7eec 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1422,6 +1422,40 @@ public class BigtableIOTest {
p.run();
}
+ /** Tests that the error happens when submitting the write request is handled. */
+ @Test
+ public void testWritingFailsAtWriteRecord() throws IOException {
+ FailureBigtableService failureService =
+ new FailureBigtableService(FailureOptions.builder().setFailAtWriteRecord(true).build());
+ BigtableIO.Write failureWrite =
+ BigtableIO.write()
+ .withInstanceId("instance")
+ .withProjectId("project")
+ .withBigtableService(failureService);
+
+ final String table = "table";
+ final String key = "key";
+ final String value = "value";
+
+ failureService.createTable(table);
+
+ p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
+ .apply("write", failureWrite.withTableId(table));
+
+ // Exception will be thrown by writer.writeRecord() when BigtableWriterFn is applied.
+ thrown.expect(IOException.class);
+ thrown.expectMessage("Fake IOException in writeRecord()");
+
+ try {
+ p.run();
+ } catch (PipelineExecutionException e) {
+ // throwing inner exception helps assert that first exception is thrown from writeRecord()
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ }
+ }
+
/** Tests that when writing an element fails, the write fails. */
@Test
public void testWritingFailsBadElement() throws Exception {
@@ -1658,6 +1692,11 @@ public class BigtableIOTest {
return new FailureBigtableReader(source, this, failureOptions);
}
+ @Override
+ public FailureBigtableWriter openForWriting(String tableId) {
+ return new FailureBigtableWriter(tableId, this, failureOptions);
+ }
+
@Override
public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
if (failureOptions.getFailAtSplit()) {
@@ -1756,6 +1795,7 @@ public class BigtableIOTest {
}
}
+ /** A {@link FakeBigtableReader} implementation that throw exceptions at given stage. */
private static class FailureBigtableReader extends FakeBigtableReader {
public FailureBigtableReader(
BigtableSource source, FakeBigtableService service, FailureOptions options) {
@@ -1785,6 +1825,7 @@ public class BigtableIOTest {
private long numAdvance;
private final FailureOptions failureOptions;
}
+
/**
* A {@link BigtableService.Writer} implementation that writes to the static instance of {@link
* FakeBigtableService} stored in {@link #service}.
@@ -1797,14 +1838,20 @@ public class BigtableIOTest {
*/
private static class FakeBigtableWriter implements BigtableService.Writer {
private final String tableId;
+ private final FakeBigtableService service;
- public FakeBigtableWriter(String tableId) {
+ public FakeBigtableWriter(String tableId, FakeBigtableService service) {
this.tableId = tableId;
+ this.service = service;
+ }
+
+ public FakeBigtableWriter(String tableId) {
+ this(tableId, BigtableIOTest.service);
}
@Override
- public CompletionStage<MutateRowResponse> writeRecord(
- KV<ByteString, Iterable<Mutation>> record) {
+ public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+ throws IOException {
service.verifyTableExists(tableId);
Map<ByteString, ByteString> table = service.getTable(tableId);
ByteString key = record.getKey();
@@ -1827,6 +1874,26 @@ public class BigtableIOTest {
public void close() {}
}
+ /** A {@link FakeBigtableWriter} implementation that throw exceptions at given stage. */
+ private static class FailureBigtableWriter extends FakeBigtableWriter {
+ public FailureBigtableWriter(
+ String tableId, FailureBigtableService service, FailureOptions options) {
+ super(tableId, service);
+ this.failureOptions = options;
+ }
+
+ @Override
+ public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+ throws IOException {
+ if (failureOptions.getFailAtWriteRecord()) {
+ throw new IOException("Fake IOException in writeRecord()");
+ }
+ return super.writeRecord(record);
+ }
+
+ private final FailureOptions failureOptions;
+ }
+
/** A serializable comparator for ByteString. Used to make row samples. */
private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
@Override
@@ -1837,29 +1904,34 @@ public class BigtableIOTest {
/** Error injection options for FakeBigtableService and FakeBigtableReader. */
@AutoValue
- abstract static class FailureOptions {
+ abstract static class FailureOptions implements Serializable {
abstract Boolean getFailAtStart();
abstract Boolean getFailAtAdvance();
abstract Boolean getFailAtSplit();
+ abstract Boolean getFailAtWriteRecord();
+
static Builder builder() {
return new AutoValue_BigtableIOTest_FailureOptions.Builder()
.setFailAtStart(false)
.setFailAtAdvance(false)
- .setFailAtSplit(false);
+ .setFailAtSplit(false)
+ .setFailAtWriteRecord(false);
}
@AutoValue.Builder
abstract static class Builder {
- abstract BigtableIOTest.FailureOptions.Builder setFailAtStart(Boolean failAtStart);
+ abstract Builder setFailAtStart(Boolean failAtStart);
+
+ abstract Builder setFailAtAdvance(Boolean failAtAdvance);
- abstract BigtableIOTest.FailureOptions.Builder setFailAtAdvance(Boolean failAtAdvance);
+ abstract Builder setFailAtSplit(Boolean failAtSplit);
- abstract BigtableIOTest.FailureOptions.Builder setFailAtSplit(Boolean failAtSplit);
+ abstract Builder setFailAtWriteRecord(Boolean failAtWriteRecord);
- abstract BigtableIOTest.FailureOptions build();
+ abstract FailureOptions build();
}
}
}