You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/13 15:43:33 UTC

[GitHub] [beam] igorbernstein2 commented on a diff in pull request #16939: Fix: Splitting scans into smaller chunks to buffer reads

igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r849592846


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +402,32 @@ public Read withRowFilter(RowFilter filter) {
       return withRowFilter(StaticValueProvider.of(filter));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.

Review Comment:
   Please update the javadocs



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +402,32 @@ public Read withRowFilter(RowFilter filter) {
       return withRowFilter(StaticValueProvider.of(filter));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.
+     *
+     * <p>Does not modify this object.
+     *
+     * When we have a builder, we initialize the value. When they call the method then we override the value
+     */
+    public Read withMaxBufferElementCount(ValueProvider<Integer> maxBufferElementCount) {

Review Comment:
   Please annotate this with Exprimental



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;

Review Comment:
   This needs a comment. ie `asynchronously refill buffer when there is 10% of the elements are left



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;

Review Comment:
   please add final modifiers for all properties that are const



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +143,538 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowMiniBatchLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                flatRowObserver.onNext(expectedRow);
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow),
+        underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1, ... b199, c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveMiniBatchLimit() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder()
+              .withRowKey(ByteString.copyFromUtf8("b" + (i + MINI_BATCH_ROW_LIMIT)))
+              .build());
+    }
+    expectedFirstRangeRows.set(0, FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size() - 1)), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be refilled twice and
+   * ReadRowsAsync should be called twice. The following test follows this example: FirstRange:
+   * [a,b1,...,b99,c) SecondRange: [c,d1,...,d99,e)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();

Review Comment:
   consider extracting a helper function that makes rows for you: List<FlatRow> createRow(String prefix, int count)
   
   Also, your current logic creates an invalid ordering of rows.
   Current key order: a, a1,...a5,...a20
   But a20 sorts before a5. You can easily fix this by adding leading zero format specifier



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;
+      tableName =
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              .addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      //future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+      future = createFuture();

Review Comment:
   Also please be consistent in your naming: segment vs minibatch 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +143,538 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowMiniBatchLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                flatRowObserver.onNext(expectedRow);
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow),
+        underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1, ... b199, c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveMiniBatchLimit() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder()
+              .withRowKey(ByteString.copyFromUtf8("b" + (i + MINI_BATCH_ROW_LIMIT)))
+              .build());
+    }
+    expectedFirstRangeRows.set(0, FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size() - 1)), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be refilled twice and
+   * ReadRowsAsync should be called twice. The following test follows this example: FirstRange:
+   * [a,b1,...,b99,c) SecondRange: [c,d1,...,d99,e)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+    expectedSecondRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("d" + i)).build());
+    }
+
+    ByteKey firstStart = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey sharedKeyEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+    ByteKey secondEnd = ByteKey.copyFrom("e".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges())
+        .thenReturn(
+            Arrays.asList(
+                ByteKeyRange.of(firstStart, sharedKeyEnd),
+                ByteKeyRange.of(sharedKeyEnd, secondEnd)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {

Review Comment:
   nit for(int i=0; i < MINI_BATCH_ROW_LIMIT - 1; i++) to make it explicit that you are advance n-1 times



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java:
##########
@@ -103,3 +103,8 @@ interface Reader {
    */
   List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
 }
+
+
+
+
+

Review Comment:
   nit, please remove extra whitespace



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;
+      tableName =
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              .addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      //future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+      future = createFuture();

Review Comment:
   please remove stale comments and please rename the method to be something more descriptive. ie `loadNextMiniBatchAsync()`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;
+      tableName =
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              .addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      //future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+      future = createFuture();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() <= miniBatchWaterMark && future == null && !lastFillComplete) {
+        startNextSegmentRead();
+      }
+      if (buffer.isEmpty()) {
+        if (future == null || lastFillComplete)
+          return false;
+        waitReadRowsFuture();
+      }
+      currentRow = FlatRowConverter.convert(buffer.remove());
+      return currentRow != null;
+    }
+
+    private SettableFuture<List<FlatRow>> createFuture() {
+      SettableFuture<List<FlatRow>> f = SettableFuture.create();
+
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();

Review Comment:
   please add a TODO to remove this hack



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;

Review Comment:
   Please extract default consts to named consts. ie private final static long DEFAULT_BUFFER_SIZE_LIMIT = ...
   
   Also the bufferSizeLimit should be a function of memory size with hard limits for the max & min. See the `Runtime` class for ways to inspect the available memory resources



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +402,32 @@ public Read withRowFilter(RowFilter filter) {
       return withRowFilter(StaticValueProvider.of(filter));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.
+     *
+     * <p>Does not modify this object.
+     *
+     * When we have a builder, we initialize the value. When they call the method then we override the value
+     */
+    public Read withMaxBufferElementCount(ValueProvider<Integer> maxBufferElementCount) {
+      checkArgument(maxBufferElementCount != null, "maxBufferElementCount can not be null");
+      BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
+      return toBuilder()
+          .setBigtableReadOptions(bigtableReadOptions.toBuilder().setMaxBufferElementCount(maxBufferElementCount).build())
+          .build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withMaxBufferElementCount(int maxBufferElementCount) {

Review Comment:
   please annotate this with Experimental



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;
+      tableName =
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              .addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      //future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+      future = createFuture();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() <= miniBatchWaterMark && future == null && !lastFillComplete) {
+        startNextSegmentRead();
+      }
+      if (buffer.isEmpty()) {
+        if (future == null || lastFillComplete)
+          return false;
+        waitReadRowsFuture();
+      }
+      currentRow = FlatRowConverter.convert(buffer.remove());
+      return currentRow != null;
+    }
+
+    private SettableFuture<List<FlatRow>> createFuture() {
+      SettableFuture<List<FlatRow>> f = SettableFuture.create();
+
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+      ScanHandler handler;
+
+      handler = session.getDataClient().readFlatRows(buildReadRowsRequest(),
+          new StreamObserver<FlatRow>() {
+            List<FlatRow> rows = new ArrayList<>();
+            long currentByteSize = 0;
+            @Override
+            public void onNext(FlatRow flatRow) {
+              rows.add(flatRow);
+              currentByteSize += flatRow.getRowKey().size() + flatRow.getCells().stream()
+                  .mapToLong(c -> c.getQualifier().size() + c.getValue().size()).sum();
+              if (currentByteSize > bufferSizeLimit) {
+                atomic.get().cancel();
+                return;
+              }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+              if (e instanceof CancellationException) {
+                f.set(rows);
+                return;
+              }
+              f.setException(e);
+            }
+
+            @Override
+            public void onCompleted() {
+              if (rows.size() < miniBatchLimit) {
+                lastFillComplete = true;
+              }
+              f.set(rows);
+              return;
+            }
+          });
+      atomic.set(handler);
+      return f;
+    }
+
+    private ReadRowsRequest buildReadRowsRequest() {
+      ReadRowsRequest.Builder request =
+          ReadRowsRequest.newBuilder()
+              .setRows(rowSet)
+              .setRowsLimit(miniBatchLimit)
+              .setTableName(tableName);
+      if (source.getRowFilter() != null) {
+        request.setFilter(source.getRowFilter());
+      }
+      return request.build();
+    }
+
+    private void startNextSegmentRead() {

Review Comment:
   I think it would be better to inline this method and rename createFuture to startNextSegmentRead



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;
+      tableName =
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              .addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      //future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+      future = createFuture();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() <= miniBatchWaterMark && future == null && !lastFillComplete) {
+        startNextSegmentRead();
+      }
+      if (buffer.isEmpty()) {
+        if (future == null || lastFillComplete)
+          return false;
+        waitReadRowsFuture();
+      }
+      currentRow = FlatRowConverter.convert(buffer.remove());
+      return currentRow != null;
+    }
+
+    private SettableFuture<List<FlatRow>> createFuture() {
+      SettableFuture<List<FlatRow>> f = SettableFuture.create();
+
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+      ScanHandler handler;
+
+      handler = session.getDataClient().readFlatRows(buildReadRowsRequest(),
+          new StreamObserver<FlatRow>() {
+            List<FlatRow> rows = new ArrayList<>();
+            long currentByteSize = 0;
+            @Override
+            public void onNext(FlatRow flatRow) {
+              rows.add(flatRow);
+              currentByteSize += flatRow.getRowKey().size() + flatRow.getCells().stream()
+                  .mapToLong(c -> c.getQualifier().size() + c.getValue().size()).sum();
+              if (currentByteSize > bufferSizeLimit) {
+                atomic.get().cancel();
+                return;
+              }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+              if (e instanceof CancellationException) {

Review Comment:
   I'm not sure if this is going to be a java native cancellationException or grpc's cancelled status exception. Please double check



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +143,538 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowMiniBatchLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                flatRowObserver.onNext(expectedRow);
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow),
+        underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1, ... b199, c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveMiniBatchLimit() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder()
+              .withRowKey(ByteString.copyFromUtf8("b" + (i + MINI_BATCH_ROW_LIMIT)))
+              .build());
+    }
+    expectedFirstRangeRows.set(0, FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size() - 1)), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be refilled twice and
+   * ReadRowsAsync should be called twice. The following test follows this example: FirstRange:
+   * [a,b1,...,b99,c) SecondRange: [c,d1,...,d99,e)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+    expectedSecondRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("d" + i)).build());
+    }
+
+    ByteKey firstStart = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey sharedKeyEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+    ByteKey secondEnd = ByteKey.copyFrom("e".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges())
+        .thenReturn(
+            Arrays.asList(
+                ByteKeyRange.of(firstStart, sharedKeyEnd),
+                ByteKeyRange.of(sharedKeyEnd, secondEnd)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }

Review Comment:
   nit: expectedSecondRangeRows.forEach(flatRowObserver::onNext)



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +143,538 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowMiniBatchLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                flatRowObserver.onNext(expectedRow);
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow),
+        underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1, ... b199, c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveMiniBatchLimit() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder()
+              .withRowKey(ByteString.copyFromUtf8("b" + (i + MINI_BATCH_ROW_LIMIT)))
+              .build());
+    }
+    expectedFirstRangeRows.set(0, FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size() - 1)), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be refilled twice and
+   * ReadRowsAsync should be called twice. The following test follows this example: FirstRange:
+   * [a,b1,...,b99,c) SecondRange: [c,d1,...,d99,e)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+    expectedSecondRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("d" + i)).build());
+    }
+
+    ByteKey firstStart = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey sharedKeyEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+    ByteKey secondEnd = ByteKey.copyFrom("e".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges())
+        .thenReturn(
+            Arrays.asList(
+                ByteKeyRange.of(firstStart, sharedKeyEnd),
+                ByteKeyRange.of(sharedKeyEnd, secondEnd)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedFirstRangeRows.get(expectedFirstRangeRows.size() - 1)), underTest.getCurrentRow());
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size() - 1)), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses three overlapping ranges. The logic should remove all keys that were added to the buffer.
+   * The following test follows this example: FirstRange: [a,b1,...,b99,b100) SecondRange:
+   * [b50,b51...b100,d1,...,d199,c) ThirdRange: [b70, c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRangesOverlappingKeys() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+    expectedSecondRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+    for (int i = 1; i < 2 * MINI_BATCH_ROW_LIMIT; i++) {
+      if (i < MINI_BATCH_ROW_LIMIT) {
+        expectedFirstRangeRows.add(
+            FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      }
+      if (i > MINI_BATCH_ROW_LIMIT / 2) {
+        expectedSecondRangeRows.add(
+            FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      }
+    }
+
+    ByteKey firstStart = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey firstEnd =
+        ByteKey.copyFrom(("b" + (MINI_BATCH_ROW_LIMIT)).getBytes(StandardCharsets.UTF_8));
+
+    ByteKey secondStart =
+        ByteKey.copyFrom(("b" + (MINI_BATCH_ROW_LIMIT / 2)).getBytes(StandardCharsets.UTF_8));
+    ByteKey secondEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+    ByteKey thirdStart =
+        ByteKey.copyFrom(("b" + (MINI_BATCH_ROW_LIMIT / 2 + 20)).getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges())
+        .thenReturn(
+            Arrays.asList(
+                ByteKeyRange.of(firstStart, firstEnd),
+                ByteKeyRange.of(secondStart, secondEnd),
+                ByteKeyRange.of(thirdStart, secondEnd)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));

Review Comment:
   can the tableId mocking be extracted to setup?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +234,246 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableMiniBatchReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<List<FlatRow>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+
+    private int miniBatchLimit = DEFAULT_MINI_BATCH_SIZE;
+    private long bufferSizeLimit = DEFAULT_BYTE_LIMIT;
+    private int miniBatchWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableMiniBatchReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && source.getMaxBufferElementCount() != 0) {
+        this.miniBatchLimit = source.getMaxBufferElementCount();
+        this.bufferSizeLimit = (long) miniBatchLimit * 100 * 1024 * 1024;
+      }
+      this.miniBatchWaterMark = miniBatchLimit / 10;
+      tableName =
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              .addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      //future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+      future = createFuture();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() <= miniBatchWaterMark && future == null && !lastFillComplete) {
+        startNextSegmentRead();
+      }
+      if (buffer.isEmpty()) {
+        if (future == null || lastFillComplete)
+          return false;
+        waitReadRowsFuture();
+      }
+      currentRow = FlatRowConverter.convert(buffer.remove());
+      return currentRow != null;
+    }
+
+    private SettableFuture<List<FlatRow>> createFuture() {
+      SettableFuture<List<FlatRow>> f = SettableFuture.create();
+
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+      ScanHandler handler;
+
+      handler = session.getDataClient().readFlatRows(buildReadRowsRequest(),
+          new StreamObserver<FlatRow>() {
+            List<FlatRow> rows = new ArrayList<>();
+            long currentByteSize = 0;
+            @Override
+            public void onNext(FlatRow flatRow) {
+              rows.add(flatRow);
+              currentByteSize += flatRow.getRowKey().size() + flatRow.getCells().stream()
+                  .mapToLong(c -> c.getQualifier().size() + c.getValue().size()).sum();
+              if (currentByteSize > bufferSizeLimit) {
+                atomic.get().cancel();
+                return;
+              }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+              if (e instanceof CancellationException) {
+                f.set(rows);
+                return;
+              }
+              f.setException(e);
+            }
+
+            @Override
+            public void onCompleted() {
+              if (rows.size() < miniBatchLimit) {
+                lastFillComplete = true;
+              }
+              f.set(rows);
+              return;
+            }
+          });
+      atomic.set(handler);
+      return f;
+    }
+
+    private ReadRowsRequest buildReadRowsRequest() {
+      ReadRowsRequest.Builder request =
+          ReadRowsRequest.newBuilder()
+              .setRows(rowSet)
+              .setRowsLimit(miniBatchLimit)
+              .setTableName(tableName);
+      if (source.getRowFilter() != null) {
+        request.setFilter(source.getRowFilter());
+      }
+      return request.build();
+    }
+
+    private void startNextSegmentRead() {
+      if (!splitRowSet(lastFetchedRow)) {
+        return;
+      }
+      future = createFuture();
+    }
+
+    private void waitReadRowsFuture() throws IOException {
+      try {
+        List<FlatRow> results = future.get();
+        future = null;
+        serviceCallMetric.call("ok");
+        fillReadRowsBuffer(results);
+      } catch (StatusRuntimeException e) {
+        serviceCallMetric.call(e.getStatus().getCode().value());
+        throw e;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        e.printStackTrace();

Review Comment:
   please remove this



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +402,32 @@ public Read withRowFilter(RowFilter filter) {
       return withRowFilter(StaticValueProvider.of(filter));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.
+     *
+     * <p>Does not modify this object.
+     *
+     * When we have a builder, we initialize the value. When they call the method then we override the value
+     */
+    public Read withMaxBufferElementCount(ValueProvider<Integer> maxBufferElementCount) {

Review Comment:
   Also, I'm not sure that this value needs to be configureable after pipeline build. I think it can just be an int 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +143,538 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowMiniBatchLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                flatRowObserver.onNext(expectedRow);
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow),
+        underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1, ... b199, c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveMiniBatchLimit() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder()
+              .withRowKey(ByteString.copyFromUtf8("b" + (i + MINI_BATCH_ROW_LIMIT)))
+              .build());
+    }
+    expectedFirstRangeRows.set(0, FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            flatRowObserver.onCompleted();
+            return scanHandler;
+          }
+        });
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableMiniBatchReaderImpl(mockSession, mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedFirstRangeRows.get(0)), underTest.getCurrentRow());
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+      underTest.advance();
+    }
+    Assert.assertEquals(
+        FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size() - 1)), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be refilled twice and
+   * ReadRowsAsync should be called twice. The following test follows this example: FirstRange:
+   * [a,b1,...,b99,c) SecondRange: [c,d1,...,d99,e)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+    expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+    List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+    expectedSecondRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+    for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+      expectedFirstRangeRows.add(FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("b" + i)).build());
+      expectedSecondRangeRows.add(
+          FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("d" + i)).build());
+    }
+
+    ByteKey firstStart = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey sharedKeyEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+    ByteKey secondEnd = ByteKey.copyFrom("e".getBytes(StandardCharsets.UTF_8));
+
+    when(mockBigtableSource.getRanges())
+        .thenReturn(
+            Arrays.asList(
+                ByteKeyRange.of(firstStart, sharedKeyEnd),
+                ByteKeyRange.of(sharedKeyEnd, secondEnd)));
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedFirstRangeRows.get(i));
+                }
+                flatRowObserver.onCompleted();
+              }
+            }.start();
+
+            return scanHandler;
+          }
+        }).thenAnswer(new Answer<ScanHandler>() {
+          @Override
+          public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+            StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+            new Thread() {
+              @Override
+              public void run() {
+                for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+                  flatRowObserver.onNext(expectedSecondRangeRows.get(i));
+                }

Review Comment:
   Even better, extract out a helper:
   mockReadRowsAnswer(OngoingStubbing<BigtableDataClient> client, List<FlatRow> rows)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org