You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/27 18:54:49 UTC

[beam] branch master updated: [BEAM-14502] Fix: Splitting scans into smaller chunks to buffer reads (#16939)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ed59b1477c4 [BEAM-14502] Fix: Splitting scans into smaller chunks to buffer reads (#16939)
ed59b1477c4 is described below

commit ed59b1477c4f879f2cba5c699c4fe2a3391b779f
Author: Diego Gomez <90...@users.noreply.github.com>
AuthorDate: Fri May 27 11:54:42 2022 -0700

    [BEAM-14502] Fix: Splitting scans into smaller chunks to buffer reads (#16939)
    
    * Fix: Spliting scans into smaller chunks to buffer reads
    
    * Nit: removing unused imports
    
    * Small clean up, iteration 1
    
    * Splitting duplicate logic into helper function
    
    * Splitting logic for rows
    
    * Test for below batch size and fixing up internal logic
    
    * Adding SingleRange test above the mini batch limit
    
    * Fixing nits
    
    * Adding multiple range test with non-overlapping ranges
    
    * Added multiple range test with overlapping keys
    
    * Temp commit for easier meeting. Will clean up after and commit again
    
    * First Async change with tests working. Multiple ranges needs to be checked for proper advancing and the ArgumentCaptor test must be created
    
    * Fixed tests, still need to implement Captor test
    
    * Created ArgumentCaptor test and fixed test errors
    
    * Fixed the failing emulator tests
    
    * Spotless fix
    
    * Cleaning up code
    
    * Fixing nits and added test comments
    
    * Fixing spotless
    
    * Fixing comments
    
    * Spotless
    
    * Fixing build issues
    
    * There is one error with using a non-vendered dependency. Working on it but PR is good otherwise
    
    * Fixing build error
    
    * Fixing part of the first round of comments, still need to work on a few
    
    * Removed nesting from advance
    
    * Added configurable batch limit
    
    * Fixed tests
    
    * Fixed tests for new implementation
    
    * Added Byte Limit test and responded to half of the new comments
    
    * Testing build errors, due to IDE issues
    
    * Implemented Runtime usage which caused ByteBuffer test to fail. Cleaned up some code based on comments.
    
    * Working on the byte buffer test, having issues with mocking Runtime values which would result in flakiness. Going to try to push for review
    
    * Fixed the byte buffer test and should use the machine's memory when testing
    
    * Attempting to fix ByteBufferTest
    
    * Applying Spotless and keeping debug statements to make sure test is OK
    
    * Reduced percentage of memory and removed print statements
    
    * Apply spotless
    
    * Apply spotless again
    
    * Fixing nits
    
    * Fixing up nits and comments
    
    * Refactoring multiple states into cleaning flow, provided factory method and removed default segementSize behavior
    
    * Fixing import
    
    * Fixed tests for refactor and cleaned up implementation
    
    * Refactored tests, added an exception test and Byte test needs to be tested manually
    
    * Responding to comments, removed some error handling
    
    * Responding to comments, fixed error handling and cleaned tests
    
    * Fixing warnings
    
    * Removed factory method inside of test and removed source dependencies. Waiting to fix CallMetric for error handling
    
    * Fixed error handling test
    
    * Responding to nits
    
    * Fixed error handling
    
    * Responding to nits
    
    * Apply Spotless
---
 .../provider/bigtable/BigtableTableFlatTest.java   |  37 ++
 .../provider/bigtable/BigtableTableTestUtils.java  |  19 +
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       |  28 ++
 .../sdk/io/gcp/bigtable/BigtableReadOptions.java   |  13 +
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 370 ++++++++++++++-
 .../io/gcp/bigtable/BigtableServiceImplTest.java   | 521 +++++++++++++++++++++
 6 files changed, 964 insertions(+), 24 deletions(-)

diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index 817570ab2cd..08082e85e0e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.Bigtable
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY2;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_FLAT_SCHEMA;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.bigTableRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.bigTableSegmentedRows;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.columnsMappingString;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFlatTableString;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createReadTable;
@@ -137,6 +138,33 @@ public class BigtableTableFlatTest {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testSegementedInsert() {
+    final String tableId = "beamWriteTableWithSegmentedRead";
+    emulatorWrapper.createTable(tableId, FAMILY_TEST);
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+    sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
+
+    String query =
+        "INSERT INTO beamWriteTableWithSegmentedRead(key, boolColumn, longColumn, stringColumn, doubleColumn) "
+            + "VALUES ('key0', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+            + "('key1', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+            + "('key2', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+            + "('key3', TRUE, CAST(10 AS bigint), 'stringValue', 5.5), "
+            + "('key4', TRUE, CAST(10 AS bigint), 'stringValue', 5.5)";
+
+    BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(query));
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<com.google.bigtable.v2.Row> bigTableRows =
+        readPipeline
+            .apply(readTransformWithSegment(tableId))
+            .apply(MapElements.via(new ReplaceCellTimestamp()));
+
+    PAssert.that(bigTableRows).containsInAnyOrder(bigTableSegmentedRows());
+    readPipeline.run().waitUntilFinish();
+  }
+
   @Test
   public void testSimpleInsert() {
     final String tableId = "beamWriteTable";
@@ -187,4 +215,13 @@ public class BigtableTableFlatTest {
         .withTableId(table)
         .withEmulator("localhost:" + BIGTABLE_EMULATOR.getPort());
   }
+
+  private BigtableIO.Read readTransformWithSegment(String table) {
+    return BigtableIO.read()
+        .withProjectId("fakeProject")
+        .withInstanceId("fakeInstance")
+        .withTableId(table)
+        .withMaxBufferElementCount(2)
+        .withEmulator("localhost:" + BIGTABLE_EMULATOR.getPort());
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
index f318676f6ba..ef66fb89aff 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
@@ -179,6 +179,25 @@ class BigtableTableTestUtils {
         .build();
   }
 
+  static com.google.bigtable.v2.Row[] bigTableSegmentedRows() {
+    com.google.bigtable.v2.Row[] rows = new com.google.bigtable.v2.Row[5];
+    List<Column> columns =
+        ImmutableList.of(
+            column("boolColumn", booleanToByteArray(true)),
+            column("doubleColumn", doubleToByteArray(5.5)),
+            column("longColumn", Longs.toByteArray(10L)),
+            column("stringColumn", "stringValue".getBytes(UTF_8)));
+    Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
+    for (int i = 0; i < 5; i++) {
+      rows[i] =
+          com.google.bigtable.v2.Row.newBuilder()
+              .setKey(byteStringUtf8("key" + i))
+              .addFamilies(family)
+              .build();
+    }
+    return rows;
+  }
+
   // There is no possibility to insert a value with fixed timestamp so we have to replace it
   // for the testing purpose.
   static com.google.bigtable.v2.Row setFixedTimestamp(com.google.bigtable.v2.Row row) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 36797092293..751ebf1e606 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -38,6 +38,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -409,6 +411,28 @@ public class BigtableIO {
       return withRowFilter(StaticValueProvider.of(filter));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches.
+     * This function will switch the base BigtableIO.Reader class to using the SegmentReader. If
+     * null is passed, this behavior will be disabled and the stream reader will be used.
+     *
+     * <p>Does not modify this object.
+     *
+     * <p>When we have a builder, we initialize the value. When they call the method then we
+     * override the value
+     */
+    @Experimental(Kind.SOURCE_SINK)
+    public Read withMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
+      BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
+      return toBuilder()
+          .setBigtableReadOptions(
+              bigtableReadOptions
+                  .toBuilder()
+                  .setMaxBufferElementCount(maxBufferElementCount)
+                  .build())
+          .build();
+    }
+
     /**
      * Returns a new {@link BigtableIO.Read} that will read only rows in the specified range.
      *
@@ -1269,6 +1293,10 @@ public class BigtableIO {
       return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null;
     }
 
+    public @Nullable Integer getMaxBufferElementCount() {
+      return readOptions.getMaxBufferElementCount();
+    }
+
     public ValueProvider<String> getTableId() {
       return config.getTableId();
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
index 92dbe715bf7..3c561e237a1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
@@ -42,6 +42,9 @@ abstract class BigtableReadOptions implements Serializable {
   /** Returns the key ranges to read. */
   abstract @Nullable ValueProvider<List<ByteKeyRange>> getKeyRanges();
 
+  /** Returns the size limit for reading segements. */
+  abstract @Nullable Integer getMaxBufferElementCount();
+
   abstract Builder toBuilder();
 
   static BigtableReadOptions.Builder builder() {
@@ -53,11 +56,17 @@ abstract class BigtableReadOptions implements Serializable {
 
     abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
 
+    abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount);
+
     abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
 
     abstract BigtableReadOptions build();
   }
 
+  BigtableReadOptions setMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
+    return toBuilder().setMaxBufferElementCount(maxBufferElementCount).build();
+  }
+
   BigtableReadOptions withRowFilter(RowFilter rowFilter) {
     return toBuilder().setRowFilter(ValueProvider.StaticValueProvider.of(rowFilter)).build();
   }
@@ -80,6 +89,10 @@ abstract class BigtableReadOptions implements Serializable {
     if (getRowFilter() != null && getRowFilter().isAccessible()) {
       checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
     }
+    if (getMaxBufferElementCount() != null) {
+      checkArgument(
+          getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative");
+    }
 
     if (getKeyRanges() != null && getKeyRanges().isAccessible()) {
       checkArgument(getKeyRanges().get() != null, "keyRanges can not be null");
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index eb9f8c40aa4..ceb842f39bf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -25,6 +25,7 @@ import com.google.bigtable.v2.MutateRowsRequest;
 import com.google.bigtable.v2.Mutation;
 import com.google.bigtable.v2.ReadRowsRequest;
 import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
 import com.google.bigtable.v2.RowRange;
 import com.google.bigtable.v2.RowSet;
 import com.google.bigtable.v2.SampleRowKeysRequest;
@@ -33,16 +34,28 @@ import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.grpc.BigtableSession;
 import com.google.cloud.bigtable.grpc.BigtableTableName;
 import com.google.cloud.bigtable.grpc.async.BulkMutation;
+import com.google.cloud.bigtable.grpc.scanner.FlatRow;
+import com.google.cloud.bigtable.grpc.scanner.FlatRowConverter;
 import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.cloud.bigtable.grpc.scanner.ScanHandler;
+import com.google.cloud.bigtable.util.ByteStringComparator;
 import com.google.protobuf.ByteString;
 import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
 import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.ServiceCallMetric;
@@ -51,9 +64,13 @@ import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closer;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +83,11 @@ import org.slf4j.LoggerFactory;
 })
 class BigtableServiceImpl implements BigtableService {
   private static final Logger LOG = LoggerFactory.getLogger(BigtableServiceImpl.class);
+  // Default byte limit is a percentage of the JVM's available memory
+  private static final double DEFAULT_BYTE_LIMIT_PERCENTAGE = .1;
+  // Percentage of max number of rows allowed in the buffer
+  private static final double WATERMARK_PERCENTAGE = .1;
+  private static final long MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024; // 100MB
 
   public BigtableServiceImpl(BigtableOptions options) {
     this.options = options;
@@ -134,28 +156,7 @@ class BigtableServiceImpl implements BigtableService {
       String tableNameSr =
           session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
 
-      HashMap<String, String> baseLabels = new HashMap<>();
-      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
-      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
-      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
-      baseLabels.put(
-          MonitoringInfoConstants.Labels.RESOURCE,
-          GcpResourceIdentifiers.bigtableResource(
-              session.getOptions().getProjectId(),
-              session.getOptions().getInstanceId(),
-              source.getTableId().get()));
-      baseLabels.put(
-          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
-      baseLabels.put(
-          MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
-      baseLabels.put(
-          MonitoringInfoConstants.Labels.TABLE_ID,
-          GcpResourceIdentifiers.bigtableTableID(
-              session.getOptions().getProjectId(),
-              session.getOptions().getInstanceId(),
-              source.getTableId().get()));
-      ServiceCallMetric serviceCallMetric =
-          new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+      ServiceCallMetric serviceCallMetric = createCallMetric(session, source.getTableId().get());
       ReadRowsRequest.Builder requestB =
           ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
       if (source.getRowFilter() != null) {
@@ -165,7 +166,7 @@ class BigtableServiceImpl implements BigtableService {
         results = session.getDataClient().readRows(requestB.build());
         serviceCallMetric.call("ok");
       } catch (StatusRuntimeException e) {
-        serviceCallMetric.call(e.getStatus().getCode().value());
+        serviceCallMetric.call(e.getStatus().getCode().toString());
         throw e;
       }
       return advance();
@@ -210,6 +211,220 @@ class BigtableServiceImpl implements BigtableService {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    private @Nullable ReadRowsRequest nextRequest;
+    private @Nullable Row currentRow;
+    private @Nullable Future<UpstreamResults> future;
+    private final Queue<Row> buffer;
+    private final int refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, BigtableSource source) {
+      RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+      if (source.getRanges().isEmpty()) {
+        rowSetBuilder = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+      } else {
+        // BigtableSource only contains ranges with a closed start key and open end key
+        for (ByteKeyRange beamRange : source.getRanges()) {
+          RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+          rangeBuilder
+              .setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+              .setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+        }
+      }
+      RowSet rowSet = rowSetBuilder.build();
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), RowFilter.getDefaultInstance());
+
+      long maxSegmentByteSize =
+          (long)
+              Math.max(
+                  MIN_BYTE_BUFFER_SIZE,
+                  (Runtime.getRuntime().totalMemory() * DEFAULT_BYTE_LIMIT_PERCENTAGE));
+
+      return new BigtableSegmentReaderImpl(
+          session,
+          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()),
+          rowSet,
+          source.getMaxBufferElementCount(),
+          maxSegmentByteSize,
+          filter,
+          createCallMetric(session, source.getTableId().get()));
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        String tableName,
+        RowSet rowSet,
+        int maxRowsInBuffer,
+        long maxSegmentByteSize,
+        RowFilter filter,
+        ServiceCallMetric serviceCallMetric) {
+      if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
+        rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      }
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(tableName)
+              .setRows(rowSet)
+              .setFilter(filter)
+              .setRowsLimit(maxRowsInBuffer)
+              .build();
+
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.serviceCallMetric = serviceCallMetric;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are left
+      this.refillSegmentWaterMark = (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() < refillSegmentWaterMark && future == null) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      currentRow = buffer.poll();
+      return currentRow != null;
+    }
+
+    private Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      // When the nextRequest is null, the last fill completed and the buffer contains the last rows
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      Row row = FlatRowConverter.convert(flatRow);
+                      currentByteSize += row.getSerializedSize();
+                      rows.add(row);
+
+                      if (currentByteSize > maxSegmentByteSize) {
+                        byteLimitReached = true;
+                        atomicScanHandler.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      // When requested rows < limit, the current request will be the last
+                      if (byteLimitReached || rows.size() == nextRequest.getRowsLimit()) {
+                        nextNextRequest =
+                            truncateRequest(nextRequest, rows.get(rows.size() - 1).getKey());
+                      }
+                      f.set(new UpstreamResults(rows, nextNextRequest));
+                    }
+                  });
+      atomicScanHandler.set(handler);
+      return f;
+    }
+
+    private void waitReadRowsFuture() throws IOException {
+      try {
+        UpstreamResults r = future.get();
+        buffer.addAll(r.rows);
+        nextRequest = r.nextRequest;
+        future = null;
+        serviceCallMetric.call("ok");
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof StatusRuntimeException) {
+          serviceCallMetric.call(((StatusRuntimeException) cause).getStatus().getCode().toString());
+        }
+        throw new IOException(cause);
+      }
+    }
+
+    private ReadRowsRequest truncateRequest(ReadRowsRequest request, ByteString lastKey) {
+      RowSet.Builder segment = RowSet.newBuilder();
+
+      for (RowRange rowRange : request.getRows().getRowRangesList()) {
+        int startCmp = StartPoint.extract(rowRange).compareTo(new StartPoint(lastKey, true));
+        int endCmp = EndPoint.extract(rowRange).compareTo(new EndPoint(lastKey, true));
+
+        if (startCmp > 0) {
+          // If the startKey is passed the split point than add the whole range
+          segment.addRowRanges(rowRange);
+        } else if (endCmp > 0) {
+          // Row is split, remove all read rowKeys and split RowSet at last buffered Row
+          RowRange subRange = rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+          segment.addRowRanges(subRange);
+        }
+      }
+      if (segment.getRowRangesCount() == 0) {
+        return null;
+      }
+
+      ReadRowsRequest.Builder requestBuilder = request.toBuilder();
+      requestBuilder.clearRows();
+      return requestBuilder.setRows(segment).build();
+    }
+
+    @Override
+    public void close() throws IOException {
+      session.close();
+    }
+
+    @Override
+    public Row getCurrentRow() throws NoSuchElementException {
+      if (currentRow == null) {
+        throw new NoSuchElementException();
+      }
+      return currentRow;
+    }
+  }
+
   @VisibleForTesting
   static class BigtableWriterImpl implements Writer {
     private BigtableSession session;
@@ -321,7 +536,11 @@ class BigtableServiceImpl implements BigtableService {
   @Override
   public Reader createReader(BigtableSource source) throws IOException {
     BigtableSession session = new BigtableSession(options);
-    return new BigtableReaderImpl(session, source);
+    if (source.getMaxBufferElementCount() != null) {
+      return BigtableSegmentReaderImpl.create(session, source);
+    } else {
+      return new BigtableReaderImpl(session, source);
+    }
   }
 
   @Override
@@ -334,4 +553,107 @@ class BigtableServiceImpl implements BigtableService {
       return session.getDataClient().sampleRowKeys(request);
     }
   }
+
+  @VisibleForTesting
+  public static ServiceCallMetric createCallMetric(BigtableSession session, String tableId) {
+    HashMap<String, String> baseLabels = new HashMap<>();
+    baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+    baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+    baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "google.bigtable.v2.ReadRows");
+    baseLabels.put(
+        MonitoringInfoConstants.Labels.RESOURCE,
+        GcpResourceIdentifiers.bigtableResource(
+            session.getOptions().getProjectId(), session.getOptions().getInstanceId(), tableId));
+    baseLabels.put(
+        MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, session.getOptions().getProjectId());
+    baseLabels.put(
+        MonitoringInfoConstants.Labels.INSTANCE_ID, session.getOptions().getInstanceId());
+    baseLabels.put(
+        MonitoringInfoConstants.Labels.TABLE_ID,
+        GcpResourceIdentifiers.bigtableTableID(
+            session.getOptions().getProjectId(), session.getOptions().getInstanceId(), tableId));
+    return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+  }
+
+  /** Helper class to ease comparison of RowRange start points. */
+  private static final class StartPoint implements Comparable<StartPoint> {
+    private final ByteString value;
+    private final boolean isClosed;
+
+    @Nonnull
+    static StartPoint extract(@Nonnull RowRange rowRange) {
+      switch (rowRange.getStartKeyCase()) {
+        case STARTKEY_NOT_SET:
+          return new StartPoint(ByteString.EMPTY, true);
+        case START_KEY_CLOSED:
+          return new StartPoint(rowRange.getStartKeyClosed(), true);
+        case START_KEY_OPEN:
+          if (rowRange.getStartKeyOpen().isEmpty()) {
+            // Take care to normalize an open empty start key to be closed.
+            return new StartPoint(ByteString.EMPTY, true);
+          } else {
+            return new StartPoint(rowRange.getStartKeyOpen(), false);
+          }
+        default:
+          throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase());
+      }
+    }
+
+    private StartPoint(@Nonnull ByteString value, boolean isClosed) {
+      this.value = value;
+      this.isClosed = isClosed;
+    }
+
+    @Override
+    public int compareTo(@Nonnull StartPoint o) {
+      return ComparisonChain.start()
+          // Empty string comes first
+          .compareTrueFirst(value.isEmpty(), o.value.isEmpty())
+          .compare(value, o.value, ByteStringComparator.INSTANCE)
+          // Closed start point comes before an open start point: [x,y] starts before (x,y].
+          .compareTrueFirst(isClosed, o.isClosed)
+          .result();
+    }
+  }
+
+  /** Helper class to ease comparison of RowRange endpoints. */
+  private static final class EndPoint implements Comparable<EndPoint> {
+    private final ByteString value;
+    private final boolean isClosed;
+
+    @Nonnull
+    static EndPoint extract(@Nonnull RowRange rowRange) {
+      switch (rowRange.getEndKeyCase()) {
+        case ENDKEY_NOT_SET:
+          return new EndPoint(ByteString.EMPTY, true);
+        case END_KEY_CLOSED:
+          return new EndPoint(rowRange.getEndKeyClosed(), true);
+        case END_KEY_OPEN:
+          if (rowRange.getEndKeyOpen().isEmpty()) {
+            // Take care to normalize an open empty end key to be closed.
+            return new EndPoint(ByteString.EMPTY, true);
+          } else {
+            return new EndPoint(rowRange.getEndKeyOpen(), false);
+          }
+        default:
+          throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase());
+      }
+    }
+
+    private EndPoint(@Nonnull ByteString value, boolean isClosed) {
+      this.value = value;
+      this.isClosed = isClosed;
+    }
+
+    @Override
+    public int compareTo(@Nonnull EndPoint o) {
+      return ComparisonChain.start()
+          // Empty string comes last
+          .compareFalseFirst(value.isEmpty(), o.value.isEmpty())
+          .compare(value, o.value, ByteStringComparator.INSTANCE)
+          // Open end point comes before a closed end point: [x,y) ends before [x,y].
+          .compareFalseFirst(isClosed, o.isClosed)
+          .result();
+    }
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index b983cc3ac86..97c51856cc9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -31,23 +31,38 @@ import com.google.bigtable.v2.Mutation;
 import com.google.bigtable.v2.Mutation.SetCell;
 import com.google.bigtable.v2.ReadRowsRequest;
 import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.grpc.BigtableDataClient;
 import com.google.cloud.bigtable.grpc.BigtableInstanceName;
 import com.google.cloud.bigtable.grpc.BigtableSession;
 import com.google.cloud.bigtable.grpc.BigtableTableName;
 import com.google.cloud.bigtable.grpc.async.BulkMutation;
+import com.google.cloud.bigtable.grpc.scanner.FlatRow;
+import com.google.cloud.bigtable.grpc.scanner.FlatRowConverter;
 import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.cloud.bigtable.grpc.scanner.ScanHandler;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
@@ -60,9 +75,14 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
 
 /** Unit tests of BigtableServiceImpl. */
 @RunWith(JUnit4.class)
@@ -72,6 +92,11 @@ public class BigtableServiceImplTest {
   private static final String INSTANCE_ID = "instance";
   private static final String TABLE_ID = "table";
 
+  private static final int SEGMENT_SIZE = 10;
+  private static final long DEFAULT_ROW_SIZE = 1024 * 1024 * 25; // 25MB
+  private static final long DEFAULT_BYTE_SEGMENT_SIZE = 1024 * 1024 * 1000;
+  private static final String DEFAULT_PREFIX = "b";
+
   private static final BigtableTableName TABLE_NAME =
       new BigtableInstanceName(PROJECT_ID, INSTANCE_ID).toTableName(TABLE_ID);
 
@@ -83,6 +108,12 @@ public class BigtableServiceImplTest {
 
   @Mock private BigtableSource mockBigtableSource;
 
+  @Mock private ScanHandler scanHandler;
+
+  @Mock private ServiceCallMetric mockCallMetric;
+
+  @Captor private ArgumentCaptor<ReadRowsRequest> requestCaptor;
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
@@ -126,6 +157,426 @@ public class BigtableServiceImplTest {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0), generateByteString(DEFAULT_PREFIX, 1)));
+
+    FlatRow expectedRow = FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(2)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001, ... b00199, b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled twice and ReadRowsAsync
+   * should be called three times. The last rpc call should return zero rows. The following test
+   * follows this example: FirstRange: [b00000,b00001,...,b00099,b00100) SecondRange:
+   * [b00100,b00101,...,b00199,b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses three overlapping ranges. The logic should remove all keys that were already added to the
+   * buffer. The following test follows this example: FirstRange: [b00000,b00001,...,b00099,b00100)
+   * SecondRange: [b00050,b00051...b00100,b00101,...,b00199,b00200) ThirdRange: [b00070, b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRangesOverlappingKeys() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, (int) (SEGMENT_SIZE * .7)),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and read. This example
+   * uses an empty request to trigger a full table scan. RowRange: [b00000, b00001, ... b00300)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadFullTableScan() throws IOException {
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE * 2, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            RowSet.getDefaultInstance(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(4)).call("ok");
+    ;
+  }
+
+  /**
+   * This test compares the amount of RowRanges being requested after the buffer is refilled. After
+   * reading the first buffer, the first two RowRanges should be removed and the RowRange containing
+   * [b00100,b00200) should be requested.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadFillBuffer() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    verify(mockBigtableDataClient, times(1))
+        .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+    Assert.assertEquals(3, requestCaptor.getValue().getRows().getRowRangesCount());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    verify(mockBigtableDataClient, times(3))
+        .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+    Assert.assertEquals(1, requestCaptor.getValue().getRows().getRowRangesCount());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test checks that the buffer will stop filling up once the byte limit is reached. It will
+   * cancel the ScanHandler after reached the limit. This test completes one fill and contains one
+   * Row after the first buffer has been completed. The test cheaks the current available memory in
+   * the JVM and uses a percent of it to mock the original behavior. Range: [b00000, b00010)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadByteLimitBuffer() throws IOException {
+    long segmentByteLimit = DEFAULT_ROW_SIZE * (SEGMENT_SIZE / 2);
+    int numOfRowsInsideBuffer = (int) (segmentByteLimit / DEFAULT_ROW_SIZE) + 1;
+
+    RowRange mockRowRange =
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateLargeSegmentResult(DEFAULT_PREFIX, 0, numOfRowsInsideBuffer),
+            generateSegmentResult(
+                DEFAULT_PREFIX, numOfRowsInsideBuffer, SEGMENT_SIZE - numOfRowsInsideBuffer),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            RowSet.newBuilder().addRowRanges(mockRowRange).build(),
+            SEGMENT_SIZE,
+            segmentByteLimit,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures the Exception handling inside of the scanHandler. This test will check if a
+   * StatusRuntimeException was thrown.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSegmentExceptionHandling() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0), generateByteString(DEFAULT_PREFIX, 1)));
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()))
+        .thenAnswer(
+            new Answer<ScanHandler>() {
+              @Override
+              public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+                StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+                new Thread() {
+                  @Override
+                  public void run() {
+                    flatRowObserver.onError(Status.INVALID_ARGUMENT.asRuntimeException());
+                  }
+                }.start();
+
+                return scanHandler;
+              }
+            });
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    IOException returnedError = null;
+    try {
+      underTest.start();
+    } catch (IOException e) {
+      returnedError = e;
+    }
+    Assert.assertTrue(returnedError.getCause() instanceof StatusRuntimeException);
+
+    Mockito.verify(mockCallMetric, Mockito.times(1))
+        .call(Status.INVALID_ARGUMENT.getCode().toString());
+  }
+
   /**
    * This test ensures that protobuf creation and interactions with {@link BulkMutation} work as
    * expected.
@@ -135,6 +586,7 @@ public class BigtableServiceImplTest {
    */
   @Test
   public void testWrite() throws IOException, InterruptedException {
+    when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
     BigtableService.Writer underTest =
         new BigtableServiceImpl.BigtableWriterImpl(mockSession, TABLE_NAME);
 
@@ -153,6 +605,7 @@ public class BigtableServiceImplTest {
     verify(mockBulkMutation, times(1)).add(expected);
 
     underTest.close();
+
     verify(mockBulkMutation, times(1)).flush();
   }
 
@@ -178,4 +631,72 @@ public class BigtableServiceImplTest {
         (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
     assertEquals(count, (long) container.getCounter(name).getCumulative());
   }
+
+  private Answer<ScanHandler> mockReadRowsAnswer(List<FlatRow> rows) {
+    return new Answer<ScanHandler>() {
+      @Override
+      public ScanHandler answer(InvocationOnMock invocationOnMock) throws Throwable {
+        StreamObserver<FlatRow> flatRowObserver = invocationOnMock.getArgument(1);
+        new Thread() {
+          @Override
+          public void run() {
+            for (int i = 0; i < rows.size(); i++) {
+              flatRowObserver.onNext(rows.get(i));
+            }
+            flatRowObserver.onCompleted();
+          }
+        }.start();
+
+        return scanHandler;
+      }
+    };
+  }
+
+  private static RowRange generateRowRange(ByteString start, ByteString end) {
+    return RowRange.newBuilder().setStartKeyClosed(start).setEndKeyOpen(end).build();
+  }
+
+  private static List<FlatRow> generateSegmentResult(String prefix, int startIndex, int count) {
+    return generateSegmentResult(prefix, startIndex, count, false);
+  }
+
+  private static List<FlatRow> generateLargeSegmentResult(
+      String prefix, int startIndex, int count) {
+    return generateSegmentResult(prefix, startIndex, count, true);
+  }
+
+  private static List<FlatRow> generateSegmentResult(
+      String prefix, int startIndex, int count, boolean largeRow) {
+    byte[] largeMemory = new byte[(int) DEFAULT_ROW_SIZE];
+    return IntStream.range(startIndex, startIndex + count)
+        .mapToObj(
+            i -> {
+              FlatRow.Builder builder = FlatRow.newBuilder();
+              if (!largeRow) {
+                builder.withRowKey(generateByteString(prefix, i));
+              } else {
+                builder
+                    .withRowKey(generateByteString(prefix, i))
+                    .addCell(
+                        "Family",
+                        ByteString.copyFromUtf8("LargeMemoryRow"),
+                        System.currentTimeMillis(),
+                        ByteString.copyFrom(largeMemory));
+              }
+              return builder.build();
+            })
+        .collect(Collectors.toList());
+  }
+
+  private <T> OngoingStubbing<T> expectRowResults(
+      OngoingStubbing<T> stub, List<List<FlatRow>> results) {
+    for (List<FlatRow> result : results) {
+      stub = stub.thenAnswer(mockReadRowsAnswer(result));
+    }
+    return stub;
+  }
+
+  private static ByteString generateByteString(String prefix, int index) {
+    return ByteString.copyFromUtf8(prefix + String.format("%05d", index));
+  }
 }