You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/27 18:54:49 UTC
[beam] branch master updated: [BEAM-14502] Fix: Splitting scans into smaller chunks to buffer reads (#16939)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ed59b1477c4 [BEAM-14502] Fix: Splitting scans into smaller chunks to buffer reads (#16939)
ed59b1477c4 is described below
commit ed59b1477c4f879f2cba5c699c4fe2a3391b779f
Author: Diego Gomez <90...@users.noreply.github.com>
AuthorDate: Fri May 27 11:54:42 2022 -0700
[BEAM-14502] Fix: Splitting scans into smaller chunks to buffer reads (#16939)
* Fix: Spliting scans into smaller chunks to buffer reads
* Nit: removing unused imports
* Small clean up, iteration 1
* Splitting duplicate logic into helper function
* Splitting logic for rows
* Test for below batch size and fixing up internal logic
* Adding SingleRange test above the mini batch limit
* Fixing nits
* Adding multiple range test with non-overlapping ranges
* Added multiple range test with overlapping keys
* Temp commit for easier meeting. Will clean up after and commit again
* First Async change with tests working. Multiple ranges needs to be checked for proper advancing and the ArgumentCaptor test must be created
* Fixed tests, still need to implement Captor test
* Created ArgumentCaptor test and fixed test errors
* Fixed the failing emulator tests
* Spotless fix
* Cleaning up code
* Fixing nits and added test comments
* Fixing spotless
* Fixing comments
* Spotless
* Fixing build issues
* There is one error with using a non-vendered dependency. Working on it but PR is good otherwise
* Fixing build error
* Fixing part of the first round of comments, still need to work on a few
* Removed nesting from advance
* Added configurable batch limit
* Fixed tests
* Fixed tests for new implementation
* Added Byte Limit test and responded to half of the new comments
* Testing build errors, due to IDE issues
* Implemented Runtime usage which caused ByteBuffer test to fail. Cleaned up some code based on comments.
* Working on the byte buffer test, having issues with mocking Runtime values which would result in flakiness. Going to try to push for review
* Fixed the byte buffer test and should use the machine's memory when testing
* Attempting to fix ByteBufferTest
* Applying Spotless and keeping debug statements to make sure test is OK
* Reduced percentage of memory and removed print statements
* Apply spotless
* Apply spotless again
* Fixing nits
* Fixing up nits and comments
* Refactoring multiple states into cleaning flow, provided factory method and removed default segementSize behavior
* Fixing import
* Fixed tests for refactor and cleaned up implementation
* Refactored tests, added an exception test and Byte test needs to be tested manually
* Responding to comments, removed some error handling
* Responding to comments, fixed error handling and cleaned tests
* Fixing warnings
* Removed factory method inside of test and removed source dependencies. Waiting to fix CallMetric for error handling
* Fixed error handling test
* Responding to nits
* Fixed error handling
* Responding to nits
* Apply Spotless
---
.../provider/bigtable/BigtableTableFlatTest.java | 37 ++
.../provider/bigtable/BigtableTableTestUtils.java | 19 +
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 28 ++
.../sdk/io/gcp/bigtable/BigtableReadOptions.java | 13 +
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 370 ++++++++++++++-
.../io/gcp/bigtable/BigtableServiceImplTest.java | 521 +++++++++++++++++++++
6 files changed, 964 insertions(+), 24 deletions(-)
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index 817570ab2cd..08082e85e0e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.Bigtable
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY2;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_FLAT_SCHEMA;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.bigTableRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.bigTableSegmentedRows;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.columnsMappingString;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFlatTableString;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createReadTable;
@@ -137,6 +138,33 @@ public class BigtableTableFlatTest {
readPipeline.run().waitUntilFinish();
}
+ @Test
+ public void testSegementedInsert() {
+ final String tableId = "beamWriteTableWithSegmentedRead";
+ emulatorWrapper.createTable(tableId, FAMILY_TEST);
+ BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+ sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
+
+ String query =
+ "INSERT INTO beamWriteTableWithSegmentedRead(key, boolColumn, longColumn, stringColumn, doubleColumn) "
+ + "VALUES ('key0', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+ + "('key1', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+ + "('key2', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+ + "('key3', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+ + "('key4', TRUE, CAST(10 AS bigint), 'stringValue', 5.5)";
+
+ BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(query));
+ writePipeline.run().waitUntilFinish();
+
+ PCollection<com.google.bigtable.v2.Row> bigTableRows =
+ readPipeline
+ .apply(readTransformWithSegment(tableId))
+ .apply(MapElements.via(new ReplaceCellTimestamp()));
+
+ PAssert.that(bigTableRows).containsInAnyOrder(bigTableSegmentedRows());
+ readPipeline.run().waitUntilFinish();
+ }
+
@Test
public void testSimpleInsert() {
final String tableId = "beamWriteTable";
@@ -187,4 +215,13 @@ public class BigtableTableFlatTest {
.withTableId(table)
.withEmulator("localhost:" + BIGTABLE_EMULATOR.getPort());
}
+
+ private BigtableIO.Read readTransformWithSegment(String table) {
+ return BigtableIO.read()
+ .withProjectId("fakeProject")
+ .withInstanceId("fakeInstance")
+ .withTableId(table)
+ .withMaxBufferElementCount(2)
+ .withEmulator("localhost:" + BIGTABLE_EMULATOR.getPort());
+ }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
index f318676f6ba..ef66fb89aff 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
@@ -179,6 +179,25 @@ class BigtableTableTestUtils {
.build();
}
+ static com.google.bigtable.v2.Row[] bigTableSegmentedRows() {
+ com.google.bigtable.v2.Row[] rows = new com.google.bigtable.v2.Row[5];
+ List<Column> columns =
+ ImmutableList.of(
+ column("boolColumn", booleanToByteArray(true)),
+ column("doubleColumn", doubleToByteArray(5.5)),
+ column("longColumn", Longs.toByteArray(10L)),
+ column("stringColumn", "stringValue".getBytes(UTF_8)));
+ Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
+ for (int i = 0; i < 5; i++) {
+ rows[i] =
+ com.google.bigtable.v2.Row.newBuilder()
+ .setKey(byteStringUtf8("key" + i))
+ .addFamilies(family)
+ .build();
+ }
+ return rows;
+ }
+
// There is no possibility to insert a value with fixed timestamp so we have to replace it
// for the testing purpose.
static com.google.bigtable.v2.Row setFixedTimestamp(com.google.bigtable.v2.Row row) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 36797092293..751ebf1e606 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -38,6 +38,8 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -409,6 +411,28 @@ public class BigtableIO {
return withRowFilter(StaticValueProvider.of(filter));
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches.
+ * This function will switch the base BigtableIO.Reader class to using the SegmentReader. If
+ * null is passed, this behavior will be disabled and the stream reader will be used.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>When we have a builder, we initialize the value. When they call the method then we
+ * override the value
+ */
+ @Experimental(Kind.SOURCE_SINK)
+ public Read withMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
+ BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
+ return toBuilder()
+ .setBigtableReadOptions(
+ bigtableReadOptions
+ .toBuilder()
+ .setMaxBufferElementCount(maxBufferElementCount)
+ .build())
+ .build();
+ }
+
/**
* Returns a new {@link BigtableIO.Read} that will read only rows in the specified range.
*
@@ -1269,6 +1293,10 @@ public class BigtableIO {
return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null;
}
+ public @Nullable Integer getMaxBufferElementCount() {
+ return readOptions.getMaxBufferElementCount();
+ }
+
public ValueProvider<String> getTableId() {
return config.getTableId();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
index 92dbe715bf7..3c561e237a1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
@@ -42,6 +42,9 @@ abstract class BigtableReadOptions implements Serializable {
/** Returns the key ranges to read. */
abstract @Nullable ValueProvider<List<ByteKeyRange>> getKeyRanges();
+ /** Returns the size limit for reading segements. */
+ abstract @Nullable Integer getMaxBufferElementCount();
+
abstract Builder toBuilder();
static BigtableReadOptions.Builder builder() {
@@ -53,11 +56,17 @@ abstract class BigtableReadOptions implements Serializable {
abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
+ abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount);
+
abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
abstract BigtableReadOptions build();
}
+ BigtableReadOptions setMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
+ return toBuilder().setMaxBufferElementCount(maxBufferElementCount).build();
+ }
+
BigtableReadOptions withRowFilter(RowFilter rowFilter) {
return toBuilder().setRowFilter(ValueProvider.StaticValueProvider.of(rowFilter)).build();
}
@@ -80,6 +89,10 @@ abstract class BigtableReadOptions implements Serializable {
if (getRowFilter() != null && getRowFilter().isAccessible()) {
checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
}
+ if (getMaxBufferElementCount() != null) {
+ checkArgument(
+ getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative");
+ }
if (getKeyRanges() != null && getKeyRanges().isAccessible()) {
checkArgument(getKeyRanges().get() != null, "keyRanges can not be null");
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index eb9f8c40aa4..ceb842f39bf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -25,6 +25,7 @@ import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.RowSet;
import com.google.bigtable.v2.SampleRowKeysRequest;
@@ -33,16 +34,28 @@ import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.bigtable.grpc.async.BulkMutation;
+import com.google.cloud.bigtable.grpc.scanner.FlatRow;
+import com.google.cloud.bigtable.grpc.scanner.FlatRowConverter;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.cloud.bigtable.grpc.scanner.ScanHandler;
+import com.google.cloud.bigtable.util.ByteStringComparator;
import com.google.protobuf.ByteString;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
@@ -51,9 +64,13 @@ import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closer;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +83,11 @@ import org.slf4j.LoggerFactory;
})
class BigtableServiceImpl implements BigtableService {
private static final Logger LOG = LoggerFactory.getLogger(BigtableServiceImpl.class);
+ // Default byte limit is a percentage of the JVM's available memory
+ private static final double DEFAULT_BYTE_LIMIT_PERCENTAGE = .1;
+ // Percentage of max number of rows allowed in the buffer
+ private static final double WATERMARK_PERCENTAGE = .1;
+ private static final long MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024; // 100MB
public BigtableServiceImpl(BigtableOptions options) {
this.options = options;
@@ -134,28 +156,7 @@ class BigtableServiceImpl implements BigtableService {
String tableNameSr =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
- HashMap<String, String> baseLabels = new HashMap<>();
- baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
- baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
- baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
- baseLabels.put(
- MonitoringInfoConstants.Labels.RESOURCE,
- GcpResourceIdentifiers.bigtableResource(
- session.getOptions().getProjectId(),
- session.getOptions().getInstanceId(),
- source.getTableId().get()));
- baseLabels.put(
- MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
- baseLabels.put(
- MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
- baseLabels.put(
- MonitoringInfoConstants.Labels.TABLE_ID,
- GcpResourceIdentifiers.bigtableTableID(
- session.getOptions().getProjectId(),
- session.getOptions().getInstanceId(),
- source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
- new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ ServiceCallMetric serviceCallMetric = createCallMetric(session, source.getTableId().get());
ReadRowsRequest.Builder requestB =
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
if (source.getRowFilter() != null) {
@@ -165,7 +166,7 @@ class BigtableServiceImpl implements BigtableService {
results = session.getDataClient().readRows(requestB.build());
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
- serviceCallMetric.call(e.getStatus().getCode().value());
+ serviceCallMetric.call(e.getStatus().getCode().toString());
throw e;
}
return advance();
@@ -210,6 +211,220 @@ class BigtableServiceImpl implements BigtableService {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ private @Nullable ReadRowsRequest nextRequest;
+ private @Nullable Row currentRow;
+ private @Nullable Future<UpstreamResults> future;
+ private final Queue<Row> buffer;
+ private final int refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ static BigtableSegmentReaderImpl create(BigtableSession session, BigtableSource source) {
+ RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+ if (source.getRanges().isEmpty()) {
+ rowSetBuilder = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+ } else {
+ // BigtableSource only contains ranges with a closed start key and open end key
+ for (ByteKeyRange beamRange : source.getRanges()) {
+ RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+ rangeBuilder
+ .setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+ .setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+ }
+ }
+ RowSet rowSet = rowSetBuilder.build();
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(), RowFilter.getDefaultInstance());
+
+ long maxSegmentByteSize =
+ (long)
+ Math.max(
+ MIN_BYTE_BUFFER_SIZE,
+ (Runtime.getRuntime().totalMemory() * DEFAULT_BYTE_LIMIT_PERCENTAGE));
+
+ return new BigtableSegmentReaderImpl(
+ session,
+ session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()),
+ rowSet,
+ source.getMaxBufferElementCount(),
+ maxSegmentByteSize,
+ filter,
+ createCallMetric(session, source.getTableId().get()));
+ }
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(
+ BigtableSession session,
+ String tableName,
+ RowSet rowSet,
+ int maxRowsInBuffer,
+ long maxSegmentByteSize,
+ RowFilter filter,
+ ServiceCallMetric serviceCallMetric) {
+ if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
+ rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ }
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(tableName)
+ .setRows(rowSet)
+ .setFilter(filter)
+ .setRowsLimit(maxRowsInBuffer)
+ .build();
+
+ this.session = session;
+ this.nextRequest = request;
+ this.maxSegmentByteSize = maxSegmentByteSize;
+ this.serviceCallMetric = serviceCallMetric;
+ this.buffer = new ArrayDeque<>();
+ // Asynchronously refill buffer when there is 10% of the elements are left
+ this.refillSegmentWaterMark = (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ future = fetchNextSegment();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() < refillSegmentWaterMark && future == null) {
+ future = fetchNextSegment();
+ }
+ if (buffer.isEmpty() && future != null) {
+ waitReadRowsFuture();
+ }
+ currentRow = buffer.poll();
+ return currentRow != null;
+ }
+
+ private Future<UpstreamResults> fetchNextSegment() {
+ SettableFuture<UpstreamResults> f = SettableFuture.create();
+ // When the nextRequest is null, the last fill completed and the buffer contains the last rows
+ if (nextRequest == null) {
+ f.set(new UpstreamResults(ImmutableList.of(), null));
+ return f;
+ }
+
+ // TODO(diegomez): Remove atomic ScanHandler for simpler StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+ ScanHandler handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ nextRequest,
+ new StreamObserver<FlatRow>() {
+ List<Row> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ Row row = FlatRowConverter.convert(flatRow);
+ currentByteSize += row.getSerializedSize();
+ rows.add(row);
+
+ if (currentByteSize > maxSegmentByteSize) {
+ byteLimitReached = true;
+ atomicScanHandler.get().cancel();
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ f.setException(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ ReadRowsRequest nextNextRequest = null;
+
+ // When requested rows < limit, the current request will be the last
+ if (byteLimitReached || rows.size() == nextRequest.getRowsLimit()) {
+ nextNextRequest =
+ truncateRequest(nextRequest, rows.get(rows.size() - 1).getKey());
+ }
+ f.set(new UpstreamResults(rows, nextNextRequest));
+ }
+ });
+ atomicScanHandler.set(handler);
+ return f;
+ }
+
+ private void waitReadRowsFuture() throws IOException {
+ try {
+ UpstreamResults r = future.get();
+ buffer.addAll(r.rows);
+ nextRequest = r.nextRequest;
+ future = null;
+ serviceCallMetric.call("ok");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ serviceCallMetric.call(((StatusRuntimeException) cause).getStatus().getCode().toString());
+ }
+ throw new IOException(cause);
+ }
+ }
+
+ private ReadRowsRequest truncateRequest(ReadRowsRequest request, ByteString lastKey) {
+ RowSet.Builder segment = RowSet.newBuilder();
+
+ for (RowRange rowRange : request.getRows().getRowRangesList()) {
+ int startCmp = StartPoint.extract(rowRange).compareTo(new StartPoint(lastKey, true));
+ int endCmp = EndPoint.extract(rowRange).compareTo(new EndPoint(lastKey, true));
+
+ if (startCmp > 0) {
+ // If the startKey is passed the split point than add the whole range
+ segment.addRowRanges(rowRange);
+ } else if (endCmp > 0) {
+ // Row is split, remove all read rowKeys and split RowSet at last buffered Row
+ RowRange subRange = rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+ segment.addRowRanges(subRange);
+ }
+ }
+ if (segment.getRowRangesCount() == 0) {
+ return null;
+ }
+
+ ReadRowsRequest.Builder requestBuilder = request.toBuilder();
+ requestBuilder.clearRows();
+ return requestBuilder.setRows(segment).build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ session.close();
+ }
+
+ @Override
+ public Row getCurrentRow() throws NoSuchElementException {
+ if (currentRow == null) {
+ throw new NoSuchElementException();
+ }
+ return currentRow;
+ }
+ }
+
@VisibleForTesting
static class BigtableWriterImpl implements Writer {
private BigtableSession session;
@@ -321,7 +536,11 @@ class BigtableServiceImpl implements BigtableService {
@Override
public Reader createReader(BigtableSource source) throws IOException {
BigtableSession session = new BigtableSession(options);
- return new BigtableReaderImpl(session, source);
+ if (source.getMaxBufferElementCount() != null) {
+ return BigtableSegmentReaderImpl.create(session, source);
+ } else {
+ return new BigtableReaderImpl(session, source);
+ }
}
@Override
@@ -334,4 +553,107 @@ class BigtableServiceImpl implements BigtableService {
return session.getDataClient().sampleRowKeys(request);
}
}
+
+ @VisibleForTesting
+ public static ServiceCallMetric createCallMetric(BigtableSession session, String tableId) {
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(), session.getOptions().getInstanceId(), tableId));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(), session.getOptions().getInstanceId(), tableId));
+ return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ }
+
+ /** Helper class to ease comparison of RowRange start points. */
+ private static final class StartPoint implements Comparable<StartPoint> {
+ private final ByteString value;
+ private final boolean isClosed;
+
+ @Nonnull
+ static StartPoint extract(@Nonnull RowRange rowRange) {
+ switch (rowRange.getStartKeyCase()) {
+ case STARTKEY_NOT_SET:
+ return new StartPoint(ByteString.EMPTY, true);
+ case START_KEY_CLOSED:
+ return new StartPoint(rowRange.getStartKeyClosed(), true);
+ case START_KEY_OPEN:
+ if (rowRange.getStartKeyOpen().isEmpty()) {
+ // Take care to normalize an open empty start key to be closed.
+ return new StartPoint(ByteString.EMPTY, true);
+ } else {
+ return new StartPoint(rowRange.getStartKeyOpen(), false);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase());
+ }
+ }
+
+ private StartPoint(@Nonnull ByteString value, boolean isClosed) {
+ this.value = value;
+ this.isClosed = isClosed;
+ }
+
+ @Override
+ public int compareTo(@Nonnull StartPoint o) {
+ return ComparisonChain.start()
+ // Empty string comes first
+ .compareTrueFirst(value.isEmpty(), o.value.isEmpty())
+ .compare(value, o.value, ByteStringComparator.INSTANCE)
+ // Closed start point comes before an open start point: [x,y] starts before (x,y].
+ .compareTrueFirst(isClosed, o.isClosed)
+ .result();
+ }
+ }
+
+ /** Helper class to ease comparison of RowRange endpoints. */
+ private static final class EndPoint implements Comparable<EndPoint> {
+ private final ByteString value;
+ private final boolean isClosed;
+
+ @Nonnull
+ static EndPoint extract(@Nonnull RowRange rowRange) {
+ switch (rowRange.getEndKeyCase()) {
+ case ENDKEY_NOT_SET:
+ return new EndPoint(ByteString.EMPTY, true);
+ case END_KEY_CLOSED:
+ return new EndPoint(rowRange.getEndKeyClosed(), true);
+ case END_KEY_OPEN:
+ if (rowRange.getEndKeyOpen().isEmpty()) {
+ // Take care to normalize an open empty end key to be closed.
+ return new EndPoint(ByteString.EMPTY, true);
+ } else {
+ return new EndPoint(rowRange.getEndKeyOpen(), false);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase());
+ }
+ }
+
+ private EndPoint(@Nonnull ByteString value, boolean isClosed) {
+ this.value = value;
+ this.isClosed = isClosed;
+ }
+
+ @Override
+ public int compareTo(@Nonnull EndPoint o) {
+ return ComparisonChain.start()
+ // Empty string comes last
+ .compareFalseFirst(value.isEmpty(), o.value.isEmpty())
+ .compare(value, o.value, ByteStringComparator.INSTANCE)
+ // Open end point comes before a closed end point: [x,y) ends before [x,y].
+ .compareFalseFirst(isClosed, o.isClosed)
+ .result();
+ }
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index b983cc3ac86..97c51856cc9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -31,23 +31,38 @@ import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Mutation.SetCell;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.cloud.bigtable.grpc.BigtableInstanceName;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.bigtable.grpc.async.BulkMutation;
+import com.google.cloud.bigtable.grpc.scanner.FlatRow;
+import com.google.cloud.bigtable.grpc.scanner.FlatRowConverter;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.cloud.bigtable.grpc.scanner.ScanHandler;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
@@ -60,9 +75,14 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
/** Unit tests of BigtableServiceImpl. */
@RunWith(JUnit4.class)
@@ -72,6 +92,11 @@ public class BigtableServiceImplTest {
private static final String INSTANCE_ID = "instance";
private static final String TABLE_ID = "table";
+ private static final int SEGMENT_SIZE = 10;
+ private static final long DEFAULT_ROW_SIZE = 1024 * 1024 * 25; // 25MB
+ private static final long DEFAULT_BYTE_SEGMENT_SIZE = 1024 * 1024 * 1000;
+ private static final String DEFAULT_PREFIX = "b";
+
private static final BigtableTableName TABLE_NAME =
new BigtableInstanceName(PROJECT_ID, INSTANCE_ID).toTableName(TABLE_ID);
@@ -83,6 +108,12 @@ public class BigtableServiceImplTest {
@Mock private BigtableSource mockBigtableSource;
+ @Mock private ScanHandler scanHandler;
+
+ @Mock private ServiceCallMetric mockCallMetric;
+
+ @Captor private ArgumentCaptor<ReadRowsRequest> requestCaptor;
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@@ -126,6 +157,426 @@ public class BigtableServiceImplTest {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0), generateByteString(DEFAULT_PREFIX, 1)));
+
+ FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow), underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(2)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and read. This example
+ * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001, ... b00199, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and read. This example
+ * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled twice and ReadRowsAsync
+ * should be called three times. The last rpc call should return zero rows. The following test
+ * follows this example: FirstRange: [b00000,b00001,...,b00099,b00100) SecondRange:
+ * [b00100,b00101,...,b00199,b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRanges() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and read. This example
+ * uses three overlapping ranges. The logic should remove all keys that were already added to the
+ * buffer. The following test follows this example: FirstRange: [b00000,b00001,...,b00099,b00100)
+ * SecondRange: [b00050,b00051...b00100,b00101,...,b00199,b00200) ThirdRange: [b00070, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRangesOverlappingKeys() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, (int) (SEGMENT_SIZE * .7)),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and read. This example
+ * uses an empty request to trigger a full table scan. RowRange: [b00000, b00001, ... b00300)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFullTableScan() throws IOException {
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE * 2, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ RowSet.getDefaultInstance(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(4)).call("ok");
+ ;
+ }
+
+ /**
+ * This test compares the amount of RowRanges being requested after the buffer is refilled. After
+ * reading the first buffer, the first two RowRanges should be removed and the RowRange containing
+ * [b00100,b00200) should be requested.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFillBuffer() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ verify(mockBigtableDataClient, times(1))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(3, requestCaptor.getValue().getRows().getRowRangesCount());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ verify(mockBigtableDataClient, times(3))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(1, requestCaptor.getValue().getRows().getRowRangesCount());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test checks that the buffer will stop filling up once the byte limit is reached. It will
+ * cancel the ScanHandler after reached the limit. This test completes one fill and contains one
+ * Row after the first buffer has been completed. The test cheaks the current available memory in
+ * the JVM and uses a percent of it to mock the original behavior. Range: [b00000, b00010)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadByteLimitBuffer() throws IOException {
+ long segmentByteLimit = DEFAULT_ROW_SIZE * (SEGMENT_SIZE / 2);
+ int numOfRowsInsideBuffer = (int) (segmentByteLimit / DEFAULT_ROW_SIZE) + 1;
+
+ RowRange mockRowRange =
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateLargeSegmentResult(DEFAULT_PREFIX, 0, numOfRowsInsideBuffer),
+ generateSegmentResult(
+ DEFAULT_PREFIX, numOfRowsInsideBuffer, SEGMENT_SIZE - numOfRowsInsideBuffer),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ RowSet.newBuilder().addRowRanges(mockRowRange).build(),
+ SEGMENT_SIZE,
+ segmentByteLimit,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures the Exception handling inside of the scanHandler. This test will check if a
+ * StatusRuntimeException was thrown.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSegmentExceptionHandling() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0), generateByteString(DEFAULT_PREFIX, 1)));
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+ .thenAnswer(
+ new Answer<ScanHandler>() {
+ @Override
+ public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+ StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+ new Thread() {
+ @Override
+ public void run() {
+ flatRowObserver.onError(Status.INVALID_ARGUMENT.asRuntimeException());
+ }
+ }.start();
+
+ return scanHandler;
+ }
+ });
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ IOException returnedError = null;
+ try {
+ underTest.start();
+ } catch (IOException e) {
+ returnedError = e;
+ }
+ Assert.assertTrue(returnedError.getCause() instanceof StatusRuntimeException);
+
+ Mockito.verify(mockCallMetric, Mockito.times(1))
+ .call(Status.INVALID_ARGUMENT.getCode().toString());
+ }
+
/**
* This test ensures that protobuf creation and interactions with {@link BulkMutation} work as
* expected.
@@ -135,6 +586,7 @@ public class BigtableServiceImplTest {
*/
@Test
public void testWrite() throws IOException, InterruptedException {
+ when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
BigtableService.Writer underTest =
new BigtableServiceImpl.BigtableWriterImpl(mockSession, TABLE_NAME);
@@ -153,6 +605,7 @@ public class BigtableServiceImplTest {
verify(mockBulkMutation, times(1)).add(expected);
underTest.close();
+
verify(mockBulkMutation, times(1)).flush();
}
@@ -178,4 +631,72 @@ public class BigtableServiceImplTest {
(MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
assertEquals(count, (long) container.getCounter(name).getCumulative());
}
+
+ private Answer<ScanHandler> mockReadRowsAnswer(List<FlatRow> rows) {
+ return new Answer<ScanHandler>() {
+ @Override
+ public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+ StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+ new Thread() {
+ @Override
+ public void run() {
+ for (int i = 0; i < rows.size(); i++) {
+ flatRowObserver.onNext(rows.get(i));
+ }
+ flatRowObserver.onCompleted();
+ }
+ }.start();
+
+ return scanHandler;
+ }
+ };
+ }
+
+ private static RowRange generateRowRange(ByteString start, ByteString end) {
+ return RowRange.newBuilder().setStartKeyClosed(start).setEndKeyOpen(end).build();
+ }
+
+ private static List<FlatRow> generateSegmentResult(String prefix, int startIndex, int count) {
+ return generateSegmentResult(prefix, startIndex, count, false);
+ }
+
+ private static List<FlatRow> generateLargeSegmentResult(
+ String prefix, int startIndex, int count) {
+ return generateSegmentResult(prefix, startIndex, count, true);
+ }
+
+ private static List<FlatRow> generateSegmentResult(
+ String prefix, int startIndex, int count, boolean largeRow) {
+ byte[] largeMemory = new byte[(int) DEFAULT_ROW_SIZE];
+ return IntStream.range(startIndex, startIndex + count)
+ .mapToObj(
+ i -> {
+ FlatRow.Builder builder = FlatRow.newBuilder();
+ if (!largeRow) {
+ builder.withRowKey(generateByteString(prefix, i));
+ } else {
+ builder
+ .withRowKey(generateByteString(prefix, i))
+ .addCell(
+ "Family",
+ ByteString.copyFromUtf8("LargeMemoryRow"),
+ System.currentTimeMillis(),
+ ByteString.copyFrom(largeMemory));
+ }
+ return builder.build();
+ })
+ .collect(Collectors.toList());
+ }
+
+ private <T> OngoingStubbing<T> expectRowResults(
+ OngoingStubbing<T> stub, List<List<FlatRow>> results) {
+ for (List<FlatRow> result : results) {
+ stub = stub.thenAnswer(mockReadRowsAnswer(result));
+ }
+ return stub;
+ }
+
+ private static ByteString generateByteString(String prefix, int index) {
+ return ByteString.copyFromUtf8(prefix + String.format("%05d", index));
+ }
}