You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:18 UTC

[hbase] 90/133: HBASE-17907 [C++] End to end Scans from Client/Table

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 74e0b08266247ffb7aa3bcbc59fbed2ac60ff1dc
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Tue Jun 6 15:16:09 2017 -0700

    HBASE-17907 [C++] End to end Scans from Client/Table
---
 hbase-native-client/bin/format-code.sh             |   4 +-
 hbase-native-client/connection/client-handler.cc   |   5 +-
 hbase-native-client/connection/request.h           |   6 +
 hbase-native-client/connection/response.h          |  22 +-
 hbase-native-client/connection/rpc-client.cc       |   2 -
 hbase-native-client/connection/rpc-client.h        |  12 +-
 hbase-native-client/core/BUCK                      |  41 +-
 hbase-native-client/core/async-client-scanner.cc   | 142 +++++++
 hbase-native-client/core/async-client-scanner.h    | 119 ++++++
 .../core/async-rpc-retrying-caller-factory.h       | 124 +++++-
 .../core/async-rpc-retrying-caller.cc              |  12 +-
 .../core/async-rpc-retrying-caller.h               |   2 -
 .../core/async-scan-rpc-retrying-caller.cc         | 447 +++++++++++++++++++++
 .../core/async-scan-rpc-retrying-caller.h          | 233 +++++++++++
 .../core/async-table-result-scanner.cc             | 161 ++++++++
 .../core/async-table-result-scanner.h              |  98 +++++
 hbase-native-client/core/cell-test.cc              |  24 ++
 hbase-native-client/core/cell.cc                   |  21 +-
 hbase-native-client/core/cell.h                    |   5 +-
 hbase-native-client/core/client-test.cc            |  13 +-
 hbase-native-client/core/delete-test.cc            |   2 +-
 hbase-native-client/core/delete.cc                 |  46 +--
 hbase-native-client/core/get.cc                    |   3 +-
 .../core/hbase-configuration-loader.h              |   5 +-
 hbase-native-client/core/hbase-rpc-controller.h    |   8 +
 hbase-native-client/core/meta-utils.cc             |  15 +-
 hbase-native-client/core/query.h                   |  19 +-
 hbase-native-client/core/raw-async-table.cc        |  27 +-
 hbase-native-client/core/raw-async-table.h         |  21 +-
 .../core/raw-scan-result-consumer.h                | 131 ++++++
 hbase-native-client/core/region-location.h         |   2 +-
 hbase-native-client/core/request-converter-test.cc |   3 +-
 hbase-native-client/core/request-converter.cc      |  96 ++++-
 hbase-native-client/core/request-converter.h       |  15 +-
 hbase-native-client/core/response-converter.cc     |  38 +-
 hbase-native-client/core/response-converter.h      |   5 +-
 .../core/{query.h => result-scanner.h}             |  28 +-
 hbase-native-client/core/result-test.cc            |  26 +-
 hbase-native-client/core/result.cc                 |  60 +--
 hbase-native-client/core/result.h                  |  18 +-
 hbase-native-client/core/scan-result-cache-test.cc | 177 ++++++++
 hbase-native-client/core/scan-result-cache.cc      | 160 ++++++++
 hbase-native-client/core/scan-result-cache.h       |  80 ++++
 hbase-native-client/core/scan-test.cc              |  15 +-
 hbase-native-client/core/scan.cc                   |  19 +-
 hbase-native-client/core/scan.h                    |  13 +-
 hbase-native-client/core/scanner-test.cc           | 368 +++++++++++++++++
 hbase-native-client/core/simple-client.cc          |  85 ++--
 hbase-native-client/core/table.cc                  |  16 +
 hbase-native-client/core/table.h                   |   5 +
 hbase-native-client/exceptions/BUCK                |  10 +-
 hbase-native-client/exceptions/exception.h         | 192 +++++----
 hbase-native-client/test-util/BUCK                 |  62 +--
 hbase-native-client/test-util/mini-cluster.cc      |  38 +-
 hbase-native-client/test-util/mini-cluster.h       |   6 +-
 hbase-native-client/test-util/test-util.cc         |   9 +
 hbase-native-client/test-util/test-util.h          |   3 +
 hbase-native-client/utils/BUCK                     |   1 +
 hbase-native-client/utils/bytes-util-test.cc       |  11 +
 hbase-native-client/utils/bytes-util.h             |  24 +-
 .../{core/query.h => utils/optional.h}             |  19 +-
 61 files changed, 3008 insertions(+), 366 deletions(-)

diff --git a/hbase-native-client/bin/format-code.sh b/hbase-native-client/bin/format-code.sh
index 8a19930..fe236d8 100755
--- a/hbase-native-client/bin/format-code.sh
+++ b/hbase-native-client/bin/format-code.sh
@@ -19,5 +19,5 @@ set -euo pipefail
 IFS=$'\n\t'
 
 
-find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 clang-format -i --style='{BasedOnStyle: Google, ColumnLimit: 100}'
-find core connection serde utils third-party security -name "BUCK" | xargs -P8 yapf -i --style=google
+find core connection exceptions serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 clang-format -i --style='{BasedOnStyle: Google, ColumnLimit: 100}'
+find core connection exceptions serde utils test-util third-party security -name "BUCK" | xargs -P8 yapf -i --style=google
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index ac16197..894ecb3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -87,7 +87,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
 
       if (cell_block_length > 0) {
         auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
-        received->set_cell_scanner(std::move(cell_scanner));
+        received->set_cell_scanner(std::shared_ptr<CellScanner>{cell_scanner.release()});
       }
 
       received->set_resp_msg(resp_msg);
@@ -129,8 +129,7 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re
     ctx->fireWrite(std::move(header));
   });
 
-  VLOG(3) << "Writing RPC Request with call_id:"
-          << r->call_id();  // TODO: more logging for RPC Header
+  VLOG(3) << "Writing RPC Request:" << r->DebugString();
 
   // Now store the call id to response.
   resp_msgs_->insert(r->call_id(), r->resp_msg());
diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h
index 520b380..4b652c0 100644
--- a/hbase-native-client/connection/request.h
+++ b/hbase-native-client/connection/request.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <folly/Conv.h>
 #include <google/protobuf/message.h>
 
 #include <cstdint>
@@ -64,6 +65,11 @@ class Request {
    * method type to decode. */
   std::string method() { return method_; }
 
+  std::string DebugString() {
+    return "call_id:" + folly::to<std::string>(call_id_) + ", req_msg:" +
+           req_msg_->ShortDebugString() + ", method:" + method_;
+  }
+
  private:
   uint32_t call_id_;
   std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr;
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
index c5472b0..38fdda0 100644
--- a/hbase-native-client/connection/response.h
+++ b/hbase-native-client/connection/response.h
@@ -18,12 +18,14 @@
  */
 #pragma once
 
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+
 #include <cstdint>
 #include <memory>
+#include <string>
 #include <utility>
 
-#include <folly/ExceptionWrapper.h>
-
 #include "serde/cell-scanner.h"
 
 // Forward
@@ -66,20 +68,26 @@ class Response {
     resp_msg_ = std::move(response);
   }
 
-  void set_cell_scanner(std::unique_ptr<CellScanner> cell_scanner) {
-    cell_scanner_ = std::move(cell_scanner);
-  }
+  void set_cell_scanner(std::shared_ptr<CellScanner> cell_scanner) { cell_scanner_ = cell_scanner; }
 
-  const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; }
+  const std::shared_ptr<CellScanner> cell_scanner() const { return cell_scanner_; }
 
   folly::exception_wrapper exception() { return exception_; }
 
   void set_exception(folly::exception_wrapper value) { exception_ = value; }
 
+  std::string DebugString() const {
+    std::string s{"call_id:"};
+    s += folly::to<std::string>(call_id_);
+    s += ", resp_msg:";
+    s += resp_msg_->ShortDebugString();
+    return s;
+  }
+
  private:
   uint32_t call_id_;
   std::shared_ptr<google::protobuf::Message> resp_msg_;
-  std::unique_ptr<CellScanner> cell_scanner_;
+  std::shared_ptr<CellScanner> cell_scanner_;
   folly::exception_wrapper exception_;
 };
 }  // namespace hbase
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index fbbe3e7..10faa7a 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -22,8 +22,6 @@
 #include <folly/Logging.h>
 #include <unistd.h>
 #include <wangle/concurrent/IOThreadPoolExecutor.h>
-#include <memory>
-#include <string>
 
 using hbase::security::User;
 using std::chrono::nanoseconds;
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 8b0abe7..0ecde5b 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -18,17 +18,19 @@
  */
 #pragma once
 
+#include <google/protobuf/service.h>
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <utility>
+
 #include "connection/connection-id.h"
 #include "connection/connection-pool.h"
 #include "connection/request.h"
 #include "connection/response.h"
 #include "security/user.h"
 
-#include <google/protobuf/service.h>
-
-#include <chrono>
-#include <utility>
-
 namespace hbase {
 
 class RpcClient {
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 203e00d..81fd4a7 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -19,10 +19,12 @@
 cxx_library(
     name="core",
     exported_headers=[
+        "async-client-scanner.h",
         "async-connection.h",
         "async-region-locator.h",
         "async-rpc-retrying-caller-factory.h",
         "async-rpc-retrying-caller.h",
+        "async-table-result-scanner.h",
         "client.h",
         "cell.h",
         "hbase-macros.h",
@@ -42,10 +44,14 @@ cxx_library(
         "delete.h",
         "scan.h",
         "result.h",
+        "result-scanner.h",
         "request-converter.h",
         "response-converter.h",
         "table.h",
+        "async-scan-rpc-retrying-caller.h",
         "raw-async-table.h",
+        "raw-scan-result-consumer.h",
+        "scan-result-cache.h",
         "hbase-rpc-controller.h",
         "time-range.h",
         "zk-util.h",
@@ -58,9 +64,12 @@ cxx_library(
         "async-batch-rpc-retrying-caller.h",
     ],
     srcs=[
+        "async-client-scanner.cc",
         "async-connection.cc",
         "async-rpc-retrying-caller-factory.cc",
         "async-rpc-retrying-caller.cc",
+        "async-scan-rpc-retrying-caller.cc",
+        "async-table-result-scanner.cc",
         "cell.cc",
         "client.cc",
         "hbase-rpc-controller.cc",
@@ -73,6 +82,7 @@ cxx_library(
         "put.cc",
         "delete.cc",
         "scan.cc",
+        "scan-result-cache.cc",
         "raw-async-table.cc",
         "result.cc",
         "request-converter.cc",
@@ -109,7 +119,7 @@ cxx_library(
         "configuration.cc",
         "hbase-configuration-loader.cc",
     ],
-    deps=["//third-party:folly"],
+    deps=["//utils:utils", "//third-party:folly"],
     compiler_flags=['-Weffc++', '-ggdb'],
     visibility=[
         'PUBLIC',
@@ -156,8 +166,12 @@ cxx_test(
     run_test_separately=True,)
 cxx_test(
     name="delete-test",
-    srcs=["delete-test.cc",],
-    deps=[":core",],
+    srcs=[
+        "delete-test.cc",
+    ],
+    deps=[
+        ":core",
+    ],
     run_test_separately=True,)
 cxx_test(
     name="increment-test",
@@ -257,6 +271,27 @@ cxx_test(
     ],
     run_test_separately=True,)
 cxx_test(
+    name="scan-result-cache-test",
+    srcs=[
+        "scan-result-cache-test.cc",
+    ],
+    deps=[
+        ":core",
+    ],
+    run_test_separately=True,)
+cxx_test(
+    name="scanner-test",
+    srcs=[
+        "scanner-test.cc",
+    ],
+    deps=[
+        ":core",
+        "//if:if",
+        "//serde:serde",
+        "//test-util:test-util",
+    ],
+    run_test_separately=True,)
+cxx_test(
     name="zk-util-test",
     srcs=[
         "zk-util-test.cc",
diff --git a/hbase-native-client/core/async-client-scanner.cc b/hbase-native-client/core/async-client-scanner.cc
new file mode 100644
index 0000000..720ab25
--- /dev/null
+++ b/hbase-native-client/core/async-client-scanner.cc
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-client-scanner.h"
+
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+
+AsyncClientScanner::AsyncClientScanner(
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
+    std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer,
+    nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+    : conn_(conn),
+      scan_(scan),
+      table_name_(table_name),
+      consumer_(consumer),
+      pause_(pause),
+      max_retries_(max_retries),
+      scan_timeout_nanos_(scan_timeout_nanos),
+      rpc_timeout_nanos_(rpc_timeout_nanos),
+      start_log_errors_count_(start_log_errors_count) {
+  results_cache_ = std::make_shared<ScanResultCache>();
+  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+}
+
+void AsyncClientScanner::Start() { OpenScanner(); }
+
+folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner(
+    std::shared_ptr<hbase::RpcClient> rpc_client,
+    std::shared_ptr<hbase::HBaseRpcController> controller,
+    std::shared_ptr<hbase::RegionLocation> loc) {
+  open_scanner_tries_++;
+
+  auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false);
+
+  auto self(shared_from_this());
+  VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString();
+  return rpc_client
+      ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
+                  security::User::defaultUser(), "ClientService")
+      .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) {
+        VLOG(5) << "Scan Response:" << presp->DebugString();
+        return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller);
+      });
+}
+
+void AsyncClientScanner::OpenScanner() {
+  auto self(shared_from_this());
+  open_scanner_tries_ = 1;
+
+  auto caller = conn_->caller_factory()
+                    ->Single<std::shared_ptr<OpenScannerResponse>>()
+                    ->table(table_name_)
+                    ->row(scan_->StartRow())
+                    ->locate_type(GetLocateType(*scan_))
+                    ->rpc_timeout(rpc_timeout_nanos_)
+                    ->operation_timeout(scan_timeout_nanos_)
+                    ->pause(pause_)
+                    ->max_retries(max_retries_)
+                    ->start_log_errors_count(start_log_errors_count_)
+                    ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller,
+                                 std::shared_ptr<hbase::RegionLocation> loc,
+                                 std::shared_ptr<hbase::RpcClient> rpc_client)
+                                 -> folly::Future<std::shared_ptr<OpenScannerResponse>> {
+                                   return CallOpenScanner(rpc_client, controller, loc);
+                                 })
+                    ->Build();
+
+  caller->Call()
+      .then([this, self](std::shared_ptr<OpenScannerResponse> resp) {
+        VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id()
+                << ", region:" << resp->region_location_->DebugString() << ", starting scan";
+        StartScan(resp);
+      })
+      .onError([this, self](const folly::exception_wrapper& e) {
+        VLOG(3) << "Open scan request received error:" << e.what();
+        consumer_->OnError(e);
+      })
+      .then([caller, self](const auto r) { return r; });
+}
+
+void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) {
+  auto self(shared_from_this());
+  auto caller = conn_->caller_factory()
+                    ->Scan()
+                    ->scanner_id(resp->scan_resp_->scanner_id())
+                    ->region_location(resp->region_location_)
+                    ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl()))
+                    ->scan(scan_)
+                    ->rpc_client(resp->rpc_client_)
+                    ->consumer(consumer_)
+                    ->results_cache(results_cache_)
+                    ->rpc_timeout(rpc_timeout_nanos_)
+                    ->scan_timeout(scan_timeout_nanos_)
+                    ->pause(pause_)
+                    ->max_retries(max_retries_)
+                    ->start_log_errors_count(start_log_errors_count_)
+                    ->Build();
+
+  caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_)
+      .then([caller, self](const bool has_more) {
+        if (has_more) {
+          // open the next scanner on the next region.
+          self->OpenScanner();
+        } else {
+          self->consumer_->OnComplete();
+        }
+      })
+      .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); })
+      .then([caller, self](const auto r) { return r; });
+}
+
+RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) {
+  // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow().
+  // When added, this method should be modified to return other RegionLocateTypes
+  // (see ConnectionUtils.java #getLocateType())
+  // TODO: When reversed scans are implemented, return other RegionLocateTypes
+  return RegionLocateType::kCurrent;
+}
+
+}  // namespace hbase
diff --git a/hbase-native-client/core/async-client-scanner.h b/hbase-native-client/core/async-client-scanner.h
new file mode 100644
index 0000000..8663468
--- /dev/null
+++ b/hbase-native-client/core/async-client-scanner.h
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/async-connection.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/raw-scan-result-consumer.h"
+#include "core/region-location.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+#include "core/scan.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+class OpenScannerResponse {
+ public:
+  OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client,
+                      const std::unique_ptr<Response>& resp,
+                      std::shared_ptr<RegionLocation> region_location,
+                      std::shared_ptr<hbase::HBaseRpcController> controller)
+      : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) {
+    scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
+    cell_scanner_ = resp->cell_scanner();
+  }
+  std::shared_ptr<hbase::RpcClient> rpc_client_;
+  std::shared_ptr<pb::ScanResponse> scan_resp_;
+  std::shared_ptr<RegionLocation> region_location_;
+  std::shared_ptr<hbase::HBaseRpcController> controller_;
+  std::shared_ptr<CellScanner> cell_scanner_;
+};
+
+class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> {
+ public:
+  template <typename... T>
+  static std::shared_ptr<AsyncClientScanner> Create(T&&... all) {
+    return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...));
+  }
+
+  void Start();
+
+ private:
+  // methods
+  AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
+                     std::shared_ptr<pb::TableName> table_name,
+                     std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause,
+                     uint32_t max_retries, nanoseconds scan_timeout_nanos,
+                     nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
+
+  folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner(
+      std::shared_ptr<hbase::RpcClient> rpc_client,
+      std::shared_ptr<hbase::HBaseRpcController> controller,
+      std::shared_ptr<hbase::RegionLocation> loc);
+
+  void OpenScanner();
+
+  void StartScan(std::shared_ptr<OpenScannerResponse> resp);
+
+  RegionLocateType GetLocateType(const Scan& scan);
+
+ private:
+  // data
+  std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<Scan> scan_;
+  std::shared_ptr<pb::TableName> table_name_;
+  std::shared_ptr<ScanResultCache> results_cache_;
+  std::shared_ptr<RawScanResultConsumer> consumer_;
+  nanoseconds pause_;
+  uint32_t max_retries_;
+  nanoseconds scan_timeout_nanos_;
+  nanoseconds rpc_timeout_nanos_;
+  uint32_t start_log_errors_count_;
+  uint32_t max_attempts_;
+  uint32_t open_scanner_tries_ = 0;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 3c11f52..1af6e72 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -28,7 +28,13 @@
 #include "connection/rpc-client.h"
 #include "core/async-batch-rpc-retrying-caller.h"
 #include "core/async-rpc-retrying-caller.h"
+#include "core/async-scan-rpc-retrying-caller.h"
+#include "core/raw-scan-result-consumer.h"
+#include "core/region-location.h"
 #include "core/row.h"
+#include "core/scan-result-cache.h"
+#include "core/scan.h"
+
 #include "if/Client.pb.h"
 #include "if/HBase.pb.h"
 
@@ -54,8 +60,8 @@ class SingleRequestCallerBuilder
 
   virtual ~SingleRequestCallerBuilder() = default;
 
-  typedef SingleRequestCallerBuilder<RESP> GenenericThisType;
-  typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
+  typedef SingleRequestCallerBuilder<RESP> GenericThisType;
+  typedef std::shared_ptr<GenericThisType> SharedThisPtr;
 
   SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
     table_name_ = table_name;
@@ -112,7 +118,7 @@ class SingleRequestCallerBuilder
 
  private:
   SharedThisPtr shared_this() {
-    return std::enable_shared_from_this<GenenericThisType>::shared_from_this();
+    return std::enable_shared_from_this<GenericThisType>::shared_from_this();
   }
 
  private:
@@ -198,6 +204,114 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
   std::chrono::nanoseconds rpc_timeout_nanos_;
   int32_t start_log_errors_count_ = 0;
 };
+
+class ScanCallerBuilder : public std::enable_shared_from_this<ScanCallerBuilder> {
+ public:
+  explicit ScanCallerBuilder(std::shared_ptr<AsyncConnection> conn,
+                             std::shared_ptr<folly::HHWheelTimer> retry_timer)
+      : conn_(conn),
+        retry_timer_(retry_timer),
+        rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
+        pause_(conn->connection_conf()->pause()),
+        scan_timeout_nanos_(conn->connection_conf()->scan_timeout()),
+        max_retries_(conn->connection_conf()->max_retries()),
+        start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
+        scanner_id_(-1) {}
+
+  virtual ~ScanCallerBuilder() = default;
+
+  typedef ScanCallerBuilder GenericThisType;
+  typedef std::shared_ptr<ScanCallerBuilder> SharedThisPtr;
+
+  SharedThisPtr rpc_client(std::shared_ptr<hbase::RpcClient> rpc_client) {
+    rpc_client_ = rpc_client;
+    return shared_this();
+  }
+
+  SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
+    rpc_timeout_nanos_ = rpc_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) {
+    scan_timeout_nanos_ = scan_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) {
+    scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos;
+    return shared_this();
+  }
+
+  SharedThisPtr pause(nanoseconds pause) {
+    pause_ = pause;
+    return shared_this();
+  }
+
+  SharedThisPtr max_retries(uint32_t max_retries) {
+    max_retries_ = max_retries;
+    return shared_this();
+  }
+
+  SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
+    start_log_errors_count_ = start_log_errors_count;
+    return shared_this();
+  }
+
+  SharedThisPtr region_location(std::shared_ptr<RegionLocation> region_location) {
+    region_location_ = region_location;
+    return shared_this();
+  }
+
+  SharedThisPtr scanner_id(int64_t scanner_id) {
+    scanner_id_ = scanner_id;
+    return shared_this();
+  }
+
+  SharedThisPtr scan(std::shared_ptr<Scan> scan) {
+    scan_ = scan;
+    return shared_this();
+  }
+
+  SharedThisPtr results_cache(std::shared_ptr<ScanResultCache> results_cache) {
+    results_cache_ = results_cache;
+    return shared_this();
+  }
+
+  SharedThisPtr consumer(std::shared_ptr<RawScanResultConsumer> consumer) {
+    consumer_ = consumer;
+    return shared_this();
+  }
+
+  std::shared_ptr<AsyncScanRpcRetryingCaller> Build() {
+    return std::make_shared<AsyncScanRpcRetryingCaller>(
+        conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_,
+        region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_,
+        rpc_timeout_nanos_, start_log_errors_count_);
+  }
+
+ private:
+  SharedThisPtr shared_this() {
+    return std::enable_shared_from_this<GenericThisType>::shared_from_this();
+  }
+
+ private:
+  std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<hbase::RpcClient> rpc_client_;
+  std::shared_ptr<Scan> scan_;
+  nanoseconds rpc_timeout_nanos_;
+  nanoseconds scan_timeout_nanos_;
+  nanoseconds scanner_lease_timeout_nanos_;
+  nanoseconds pause_;
+  uint32_t max_retries_;
+  uint32_t start_log_errors_count_;
+  std::shared_ptr<RegionLocation> region_location_;
+  int64_t scanner_id_;
+  std::shared_ptr<RawScanResultConsumer> consumer_;
+  std::shared_ptr<ScanResultCache> results_cache_;
+};  // end of ScanCallerBuilder
+
 class AsyncRpcRetryingCallerFactory {
  private:
   std::shared_ptr<AsyncConnection> conn_;
@@ -218,6 +332,10 @@ class AsyncRpcRetryingCallerFactory {
   std::shared_ptr<BatchCallerBuilder> Batch() {
     return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_);
   }
+
+  std::shared_ptr<ScanCallerBuilder> Scan() {
+    return std::make_shared<ScanCallerBuilder>(conn_, retry_timer_);
+  }
 };
 
 }  // namespace hbase
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index a2c5f0e..b9d01bb 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -111,7 +111,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
     Consumer<exception_wrapper> update_cached_location) {
   ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
   exceptions_->push_back(twec);
-  if (!ShouldRetry(error) || tries_ >= max_retries_) {
+  if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
     CompleteExceptionally();
     return;
   }
@@ -149,14 +149,6 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
 }
 
 template <typename RESP>
-bool AsyncSingleRequestRpcRetryingCaller<RESP>::ShouldRetry(const exception_wrapper& error) {
-  bool do_not_retry = false;
-  error.with_exception(
-      [&](const RemoteException& remote_ex) { do_not_retry &= remote_ex.do_not_retry(); });
-  return !do_not_retry;
-}
-
-template <typename RESP>
 void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
   int64_t call_timeout_ns;
   if (operation_timeout_nanos_.count() > 0) {
@@ -220,6 +212,8 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
 // explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
 // templetized
 // class definitions.
+class OpenScannerResponse;
 template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
 template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
 } /* namespace hbase */
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
index fa3c288..c7e28d0 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -84,8 +84,6 @@ class AsyncSingleRequestRpcRetryingCaller {
   void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg,
                Consumer<folly::exception_wrapper> update_cached_location);
 
-  bool ShouldRetry(const folly::exception_wrapper& error);
-
   void Call(const RegionLocation& loc);
 
   void CompleteExceptionally();
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
new file mode 100644
index 0000000..fbdf17a
--- /dev/null
+++ b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-scan-rpc-retrying-caller.h"
+
+namespace hbase {
+
+ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+    : caller_(caller), mutex_() {}
+
+void ScanResumerImpl::Resume() {
+  // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
+  // just return at the first if condition without loading the resp and numValidResuls field. If
+  // resume is called after suspend, then it is also safe to just reference resp and
+  // numValidResults after the synchronized block as no one will change it anymore.
+  std::shared_ptr<pb::ScanResponse> local_resp;
+  int64_t local_num_complete_rows;
+
+  {
+    std::unique_lock<std::mutex> mlock{mutex_};
+    if (state_ == ScanResumerState::kInitialized) {
+      // user calls this method before we call prepare, so just set the state to
+      // RESUMED, the implementation will just go on.
+      state_ = ScanResumerState::kResumed;
+      return;
+    }
+    if (state_ == ScanResumerState::kResumed) {
+      // already resumed, give up.
+      return;
+    }
+    state_ = ScanResumerState::kResumed;
+    local_resp = resp_;
+    local_num_complete_rows = num_complete_rows_;
+  }
+
+  caller_->CompleteOrNext(local_resp);
+}
+
+bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  if (state_ == ScanResumerState::kResumed) {
+    // user calls resume before we actually suspend the scan, just continue;
+    return false;
+  }
+  state_ = ScanResumerState::kSuspended;
+  resp_ = resp;
+  num_complete_rows_ = num_complete_rows;
+
+  return true;
+}
+
+ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+    : caller_(caller) {}
+
+std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() {
+  PreCheck();
+  state_ = ScanControllerState::kSuspended;
+  resumer_ = std::make_shared<ScanResumerImpl>(caller_);
+  return resumer_;
+}
+
+void ScanControllerImpl::Terminate() {
+  PreCheck();
+  state_ = ScanControllerState::kTerminated;
+}
+
+// return the current state, and set the state to DESTROYED.
+ScanControllerState ScanControllerImpl::Destroy() {
+  ScanControllerState state = state_;
+  state_ = ScanControllerState::kDestroyed;
+  return state;
+}
+
+void ScanControllerImpl::PreCheck() {
+  CHECK(std::this_thread::get_id() == caller_thread_id_)
+      << "The current thread is" << std::this_thread::get_id() << ", expected thread is "
+      << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat";
+
+  CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state "
+                                                     << DebugString(state_);
+}
+
+std::string ScanControllerImpl::DebugString(ScanControllerState state) {
+  switch (state) {
+    case ScanControllerState::kInitialized:
+      return "kInitialized";
+    case ScanControllerState::kSuspended:
+      return "kSuspended";
+    case ScanControllerState::kTerminated:
+      return "kTerminated";
+    case ScanControllerState::kDestroyed:
+      return "kDestroyed";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+std::string ScanControllerImpl::DebugString(ScanResumerState state) {
+  switch (state) {
+    case ScanResumerState::kInitialized:
+      return "kInitialized";
+    case ScanResumerState::kSuspended:
+      return "kSuspended";
+    case ScanResumerState::kResumed:
+      return "kResumed";
+    default:
+      return "UNKNOWN";
+  }
+}
+
+AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller(
+    std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+    std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id,
+    std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer,
+    std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos,
+    nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+    nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+    : conn_(conn),
+      retry_timer_(retry_timer),
+      rpc_client_(rpc_client),
+      scan_(scan),
+      scanner_id_(scanner_id),
+      results_cache_(results_cache),
+      consumer_(consumer),
+      region_location_(region_location),
+      scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos),
+      pause_(pause),
+      max_retries_(max_retries),
+      scan_timeout_nanos_(scan_timeout_nanos),
+      rpc_timeout_nanos_(rpc_timeout_nanos),
+      start_log_errors_count_(start_log_errors_count),
+      promise_(std::make_shared<folly::Promise<bool>>()),
+      tries_(1) {
+  controller_ = conn_->CreateRpcController();
+  start_ns_ = TimeUtil::GetNowNanos();
+  max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+  exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+}
+
+folly::Future<bool> AsyncScanRpcRetryingCaller::Start(
+    std::shared_ptr<HBaseRpcController> controller,
+    std::shared_ptr<pb::ScanResponse> open_scan_resp,
+    const std::shared_ptr<CellScanner> cell_scanner) {
+  OnComplete(controller, open_scan_resp, cell_scanner);
+  return promise_->getFuture();
+}
+
+int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() {
+  return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
+                                            std::shared_ptr<pb::ScanResponse> resp,
+                                            const std::shared_ptr<CellScanner> cell_scanner) {
+  VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
+
+  if (controller->Failed()) {
+    OnError(controller->exception());
+    return;
+  }
+
+  bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
+
+  int64_t num_complete_rows_before = results_cache_->num_complete_rows();
+  try {
+    auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
+
+    auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
+
+    auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
+
+    if (results.size() > 0) {
+      UpdateNextStartRowWhenError(*results[results.size() - 1]);
+      VLOG(5) << "Calling consumer->OnNext()";
+      consumer_->OnNext(results, scan_controller);
+    } else if (is_heartbeat) {
+      consumer_->OnHeartbeat(scan_controller);
+    }
+
+    ScanControllerState state = scan_controller->Destroy();
+    if (state == ScanControllerState::kTerminated) {
+      if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+        // we have more results in region but user request to stop the scan, so we need to close the
+        // scanner explicitly.
+        CloseScanner();
+      }
+      CompleteNoMoreResults();
+      return;
+    }
+
+    int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
+    if (state == ScanControllerState::kSuspended) {
+      if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
+        return;
+      }
+    }
+  } catch (const std::runtime_error& e) {
+    // We can not retry here. The server has responded normally and the call sequence has been
+    // increased so a new scan with the same call sequence will cause an
+    // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
+    LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
+    CompleteWhenError(true);
+    return;
+  }
+
+  CompleteOrNext(resp);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) {
+  VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_
+          << ", response:" << resp->ShortDebugString();
+
+  if (resp->has_more_results() && !resp->more_results()) {
+    // RS tells us there is no more data for the whole scan
+    CompleteNoMoreResults();
+    return;
+  }
+  // TODO: Implement Scan::limit(), and check the limit here
+
+  if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+    // TODO: check whether Scan is reversed here
+    CompleteWhenNoMoreResultsInRegion();
+    return;
+  }
+  Next();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) {
+  VLOG(5) << "Scan: CompleteExceptionally";
+  results_cache_->Clear();
+  if (close_scanner) {
+    CloseScanner();
+  }
+  this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() {
+  // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+  // in branch-1 code. If this is backported, make sure that the scanner is closed.
+  VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_;
+  promise_->setValue(false);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() {
+  VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_;
+  // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+  // in branch-1 code. If this is backported, make sure that the scanner is closed.
+  if (NoMoreResultsForScan(*scan_, region_location_->region_info())) {
+    CompleteNoMoreResults();
+  } else {
+    CompleteWithNextStartRow(region_location_->region_info().end_key(), true);
+  }
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) {
+  VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region";
+  scan_->SetStartRow(row);
+  // TODO: set inclusive if it is reverse scans
+  promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) {
+  next_start_row_when_error_ = optional<std::string>(result.Row());
+  include_next_start_row_when_error_ = result.Partial();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) {
+  VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_;
+  results_cache_->Clear();
+  if (close_scanner) {
+    CloseScanner();
+  }
+  if (next_start_row_when_error_) {
+    // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement
+    // those options in Scan , we can start using that here.
+    scan_->SetStartRow(include_next_start_row_when_error_
+                           ? *next_start_row_when_error_
+                           : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_));
+  }
+  promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
+  VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
+  if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
+    LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
+                 << " for scanner id = " << scanner_id_ << " for "
+                 << region_location_->region_info().ShortDebugString()
+                 << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
+                 << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
+                 << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
+                 << error.what().toStdString();
+  }
+
+  bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
+  ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
+  exceptions_->push_back(twec);
+  if (tries_ >= max_retries_) {
+    CompleteExceptionally(!scanner_closed);
+    return;
+  }
+
+  int64_t delay_ns;
+  if (scan_timeout_nanos_.count() > 0) {
+    int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+    if (max_delay_ns <= 0) {
+      CompleteExceptionally(!scanner_closed);
+      return;
+    }
+    delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+  } else {
+    delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+  }
+
+  if (scanner_closed) {
+    CompleteWhenError(false);
+    return;
+  }
+
+  if (ExceptionUtil::IsScannerOutOfOrder(error)) {
+    CompleteWhenError(true);
+    return;
+  }
+  if (!ExceptionUtil::ShouldRetry(error)) {
+    CompleteExceptionally(true);
+    return;
+  }
+  tries_++;
+
+  auto self(shared_from_this());
+  conn_->retry_executor()->add([&]() {
+    retry_timer_->scheduleTimeoutFn(
+        [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
+        std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
+  });
+}
+
+bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan,
+                                                      const pb::RegionInfo& info) {
+  if (BytesUtil::IsEmptyStopRow(info.end_key())) {
+    return true;
+  }
+  if (BytesUtil::IsEmptyStopRow(scan.StopRow())) {
+    return false;
+  }
+  int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow());
+  // 1. if our stop row is less than the endKey of the region
+  // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
+  // for scan.
+  return c > 0 ||
+         (c == 0 /* && !scan.IncludeStopRow()*/);  // TODO: Scans always exclude StopRow for now.
+}
+
+void AsyncScanRpcRetryingCaller::Next() {
+  VLOG(5) << "Scan: Next";
+  next_call_seq_++;
+  tries_ = 1;
+  exceptions_->clear();
+  start_ns_ = TimeUtil::GetNowNanos();
+  Call();
+}
+
+void AsyncScanRpcRetryingCaller::Call() {
+  VLOG(5) << "Scan: Call";
+  auto self(shared_from_this());
+  // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+  // less than the scan timeout. If the server does not respond in time(usually this will not
+  // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+  // resending the next request and the only way to fix this is to close the scanner and open a
+  // new one.
+  int64_t call_timeout_nanos;
+  if (scan_timeout_nanos_.count() > 0) {
+    int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+    if (remaining_nanos <= 0) {
+      CompleteExceptionally(true);
+      return;
+    }
+    call_timeout_nanos = remaining_nanos;
+  } else {
+    call_timeout_nanos = 0L;
+  }
+
+  ResetController(controller_, call_timeout_nanos);
+
+  auto req =
+      RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false);
+
+  // do the RPC call
+  rpc_client_
+      ->AsyncCall(region_location_->server_name().host_name(),
+                  region_location_->server_name().port(), std::move(req),
+                  security::User::defaultUser(), "ClientService")
+      .then([self, this](const std::unique_ptr<Response>& resp) {
+        auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
+        return OnComplete(controller_, scan_resp, resp->cell_scanner());
+      })
+      .onError([self, this](const folly::exception_wrapper& e) { OnError(e); });
+}
+
+void AsyncScanRpcRetryingCaller::CloseScanner() {
+  auto self(shared_from_this());
+  ResetController(controller_, rpc_timeout_nanos_.count());
+
+  VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_);
+
+  // Do a close scanner RPC. Fire and forget.
+  auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true);
+  rpc_client_
+      ->AsyncCall(region_location_->server_name().host_name(),
+                  region_location_->server_name().port(), std::move(req),
+                  security::User::defaultUser(), "ClientService")
+      .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> {
+        LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() +
+                            " for closing scanner_id = " + folly::to<std::string>(scanner_id_) +
+                            " for " + region_location_->region_info().ShortDebugString() +
+                            " failed, ignore, probably already closed. Exception:" +
+                            e.what().toStdString();
+        return nullptr;
+      });
+}
+
+void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller,
+                                                 const int64_t& timeout_nanos) {
+  controller->Reset();
+  if (timeout_nanos >= 0) {
+    controller->set_call_timeout(
+        milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos))));
+  }
+}
+
+}  // namespace hbase
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.h b/hbase-native-client/core/async-scan-rpc-retrying-caller.h
new file mode 100644
index 0000000..9555e80
--- /dev/null
+++ b/hbase-native-client/core/async-scan-rpc-retrying-caller.h
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/async-connection.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/raw-scan-result-consumer.h"
+#include "core/region-location.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+#include "core/scan.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "utils/bytes-util.h"
+#include "utils/connection-util.h"
+#include "utils/optional.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+class AsyncScanRpcRetryingCaller;
+
+// The resume method is allowed to be called in another thread so here we also use the
+// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
+// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
+// and when user calls resume method, we will change the state to RESUMED. But the resume method
+// could be called in other thread, and in fact, user could just do this:
+// controller.suspend().resume()
+// This is strange but valid. This means the scan could be resumed before we call the prepare
+// method to do the actual suspend work. So in the resume method, we will check if the state is
+// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
+// method, if the state is RESUMED already, we will just return an let the scan go on.
+// Notice that, the public methods of this class is supposed to be called by upper layer only, and
+// package private methods can only be called within the implementation of
+// AsyncScanSingleRegionRpcRetryingCaller.
+// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread.
+// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the
+// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The
+// application is expected to consume the scan results before the scanner lease timeout.
+class ScanResumerImpl : public ScanResumer {
+ public:
+  explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
+
+  virtual ~ScanResumerImpl() = default;
+
+  /**
+   * Resume the scan. You are free to call it multiple time but only the first call will take
+   * effect.
+   */
+  void Resume() override;
+
+  // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
+  // for more details.
+  bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows);
+
+ private:
+  // INITIALIZED -> SUSPENDED -> RESUMED
+  // INITIALIZED -> RESUMED
+  ScanResumerState state_ = ScanResumerState::kInitialized;
+  std::mutex mutex_;
+  std::shared_ptr<pb::ScanResponse> resp_ = nullptr;
+  int64_t num_complete_rows_ = 0;
+  std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
+};
+
+class ScanControllerImpl : public ScanController {
+ public:
+  virtual ~ScanControllerImpl() = default;
+
+  explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
+
+  /**
+   * Suspend the scan.
+   * <p>
+   * This means we will stop fetching data in background, i.e., will not call onNext any more
+   * before you resume the scan.
+   * @return A resumer used to resume the scan later.
+   */
+  std::shared_ptr<ScanResumer> Suspend();
+
+  /**
+   * Terminate the scan.
+   * <p>
+   * This is useful when you have got enough results and want to stop the scan in onNext method,
+   * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+   */
+  void Terminate();
+
+  // return the current state, and set the state to DESTROYED.
+  ScanControllerState Destroy();
+
+  std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; }
+
+ private:
+  void PreCheck();
+
+  std::string DebugString(ScanControllerState state);
+
+  std::string DebugString(ScanResumerState state);
+
+ private:
+  // Make sure the methods are only called in this thread.
+  std::thread::id caller_thread_id_ = std::this_thread::get_id();
+  // INITIALIZED -> SUSPENDED -> DESTROYED
+  // INITIALIZED -> TERMINATED -> DESTROYED
+  // INITIALIZED -> DESTROYED
+  // If the state is incorrect we will throw IllegalStateException.
+  ScanControllerState state_ = ScanControllerState::kInitialized;
+  std::shared_ptr<ScanResumerImpl> resumer_ = nullptr;
+  std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
+};
+
+class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> {
+ public:
+  AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
+                             std::shared_ptr<folly::HHWheelTimer> retry_timer,
+                             std::shared_ptr<hbase::RpcClient> rpc_client,
+                             std::shared_ptr<Scan> scan, int64_t scanner_id,
+                             std::shared_ptr<ScanResultCache> results_cache,
+                             std::shared_ptr<RawScanResultConsumer> consumer,
+                             std::shared_ptr<RegionLocation> region_location,
+                             nanoseconds scanner_lease_timeout_nanos, nanoseconds pause,
+                             uint32_t max_retries, nanoseconds scan_timeout_nanos,
+                             nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
+
+  folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller,
+                            std::shared_ptr<pb::ScanResponse> open_scan_resp,
+                            const std::shared_ptr<CellScanner> cell_scanner);
+
+ private:
+  int64_t RemainingTimeNs();
+  void OnComplete(std::shared_ptr<HBaseRpcController> controller,
+                  std::shared_ptr<pb::ScanResponse> resp,
+                  const std::shared_ptr<CellScanner> cell_scanner);
+
+  void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp);
+
+  void CompleteExceptionally(bool close_scanner);
+
+  void CompleteNoMoreResults();
+
+  void CompleteWhenNoMoreResultsInRegion();
+
+  void CompleteWithNextStartRow(std::string row, bool inclusive);
+
+  void UpdateNextStartRowWhenError(const Result& result);
+
+  void CompleteWhenError(bool close_scanner);
+
+  void OnError(const folly::exception_wrapper& e);
+
+  bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info);
+
+  void Next();
+
+  void Call();
+
+  void CloseScanner();
+
+  void ResetController(std::shared_ptr<HBaseRpcController> controller,
+                       const int64_t& timeout_nanos);
+
+ private:
+  std::shared_ptr<AsyncConnection> conn_;
+  std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+  std::shared_ptr<hbase::RpcClient> rpc_client_;
+  std::shared_ptr<Scan> scan_;
+  int64_t scanner_id_;
+  std::shared_ptr<ScanResultCache> results_cache_;
+  std::shared_ptr<RawScanResultConsumer> consumer_;
+  std::shared_ptr<RegionLocation> region_location_;
+  nanoseconds scanner_lease_timeout_nanos_;
+  nanoseconds pause_;
+  uint32_t max_retries_;
+  nanoseconds scan_timeout_nanos_;
+  nanoseconds rpc_timeout_nanos_;
+  uint32_t start_log_errors_count_;
+  std::shared_ptr<folly::Promise<bool>> promise_;
+  std::shared_ptr<HBaseRpcController> controller_;
+  optional<std::string> next_start_row_when_error_ = optional<std::string>();
+  bool include_next_start_row_when_error_ = true;
+  uint64_t start_ns_;
+  uint32_t tries_;
+  std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
+  uint32_t max_attempts_;
+  int64_t next_call_seq_ = -1L;
+
+  friend class ScanResumerImpl;
+  friend class ScanControllerImpl;
+};
+
+}  // namespace hbase
diff --git a/hbase-native-client/core/async-table-result-scanner.cc b/hbase-native-client/core/async-table-result-scanner.cc
new file mode 100644
index 0000000..b1935ae
--- /dev/null
+++ b/hbase-native-client/core/async-table-result-scanner.cc
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-table-result-scanner.h"
+
+#include <vector>
+
+namespace hbase {
+AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size)
+    : max_cache_size_(max_cache_size) {
+  closed_ = false;
+  cache_size_ = 0;
+}
+
+AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); }
+
+void AsyncTableResultScanner::Close() {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  closed_ = true;
+  while (!queue_.empty()) {
+    queue_.pop();
+  }
+  cache_size_ = 0;
+  if (resumer_ != nullptr) {
+    resumer_->Resume();
+  }
+  cond_.notify_all();
+}
+
+std::shared_ptr<Result> AsyncTableResultScanner::Next() {
+  VLOG(5) << "AsyncTableResultScanner: Next()";
+
+  std::shared_ptr<Result> result = nullptr;
+  std::shared_ptr<ScanResumer> local_resumer = nullptr;
+  {
+    std::unique_lock<std::mutex> mlock(mutex_);
+    while (queue_.empty()) {
+      if (closed_) {
+        return nullptr;
+      }
+      if (error_) {
+        throw error_;
+      }
+      cond_.wait(mlock);
+    }
+    result = queue_.front();
+    queue_.pop();
+
+    cache_size_ -= EstimatedSizeWithSharedPtr(result);
+    if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) {
+      VLOG(1) << std::this_thread::get_id() << " resume scan prefetching";
+      local_resumer = resumer_;
+      resumer_ = nullptr;
+    }
+  }
+
+  // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that
+  // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC
+  // in the same event thread before returning from the previous call. This seems like the
+  // wrong thing to do(â„¢), but we cannot fix that now. Since the call back can end up calling
+  // this::OnNext(), we should unlock the mutex.
+  if (local_resumer != nullptr) {
+    local_resumer->Resume();
+  }
+  return result;
+}
+
+void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) {
+  VLOG(5) << "AsyncTableResultScanner: AddToCache()";
+  for (const auto r : results) {
+    queue_.push(r);
+    cache_size_ += EstimatedSizeWithSharedPtr(r);
+  }
+}
+
+template <typename T>
+inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) {
+  return t->EstimatedSize() + sizeof(std::shared_ptr<T>);
+}
+
+void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results,
+                                     std::shared_ptr<ScanController> controller) {
+  VLOG(5) << "AsyncTableResultScanner: OnNext()";
+  {
+    std::unique_lock<std::mutex> mlock(mutex_);
+    if (closed_) {
+      controller->Terminate();
+      return;
+    }
+    AddToCache(results);
+
+    if (cache_size_ >= max_cache_size_) {
+      StopPrefetch(controller);
+    }
+  }
+  cond_.notify_all();
+}
+
+void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) {
+  VLOG(1) << std::this_thread::get_id()
+          << ": stop prefetching when scanning as the cache size " +
+                 folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " +
+                 folly::to<std::string>(max_cache_size_);
+
+  resumer_ = controller->Suspend();
+  num_prefetch_stopped_++;
+}
+
+/**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ *          instance is only valid within the scope of onHeartbeat method. You can only call its
+ *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  if (closed_) {
+    controller->Terminate();
+  }
+}
+
+/**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) {
+  LOG(WARNING) << "Scanner received error" << error.what();
+  std::unique_lock<std::mutex> mlock(mutex_);
+  error_ = error;
+  cond_.notify_all();
+}
+
+/**
+ * Indicate that the scan operation is completed normally.
+ */
+void AsyncTableResultScanner::OnComplete() {
+  std::unique_lock<std::mutex> mlock(mutex_);
+  closed_ = true;
+  cond_.notify_all();
+}
+}  // namespace hbase
diff --git a/hbase-native-client/core/async-table-result-scanner.h b/hbase-native-client/core/async-table-result-scanner.h
new file mode 100644
index 0000000..dcdf871
--- /dev/null
+++ b/hbase-native-client/core/async-table-result-scanner.h
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Logging.h>
+#include <chrono>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "core/raw-scan-result-consumer.h"
+#include "core/result-scanner.h"
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer {
+ public:
+  explicit AsyncTableResultScanner(int64_t max_cache_size);
+
+  virtual ~AsyncTableResultScanner();
+
+  void Close() override;
+
+  std::shared_ptr<Result> Next() override;
+
+  void OnNext(const std::vector<std::shared_ptr<Result>> &results,
+              std::shared_ptr<ScanController> controller) override;
+
+  /**
+   * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+   * onNext.
+   * <p>
+   * This method give you a chance to terminate a slow scan operation.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within the scope of onHeartbeat method. You can only call its
+   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+   */
+  void OnHeartbeat(std::shared_ptr<ScanController> controller) override;
+
+  /**
+   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+   * <p>
+   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+   */
+  void OnError(const folly::exception_wrapper &error) override;
+
+  /**
+   * Indicate that the scan operation is completed normally.
+   */
+  void OnComplete() override;
+
+  // For testing
+  uint32_t num_prefetch_stopped() { return num_prefetch_stopped_; }
+
+ private:
+  void AddToCache(const std::vector<std::shared_ptr<Result>> &results);
+
+  template <typename T>
+  inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr<T> t);
+
+  void StopPrefetch(std::shared_ptr<ScanController> controller);
+
+ private:
+  std::queue<std::shared_ptr<Result>> queue_;
+  std::mutex mutex_;
+  std::condition_variable cond_;
+  folly::exception_wrapper error_;
+  int64_t cache_size_;
+  int64_t max_cache_size_;
+  bool closed_;
+  std::shared_ptr<ScanResumer> resumer_ = nullptr;
+  uint32_t num_prefetch_stopped_ = 0;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/cell-test.cc b/hbase-native-client/core/cell-test.cc
index efb835d..4611473 100644
--- a/hbase-native-client/core/cell-test.cc
+++ b/hbase-native-client/core/cell-test.cc
@@ -169,3 +169,27 @@ TEST(CellTest, CellDebugString) {
   LOG(INFO) << cell2.DebugString();
   EXPECT_EQ("row/column/42/DELETE/vlen=5/seqid=0", cell2.DebugString());
 }
+
+TEST(CellTest, CellEstimatedSize) {
+  CellType cell_type = CellType::PUT;
+  int64_t timestamp = std::numeric_limits<int64_t>::max();
+
+  Cell empty{"a", "a", "", timestamp, "", cell_type};
+  Cell cell1{"aa", "a", "", timestamp, "", cell_type};
+  Cell cell2{"a", "aa", "", timestamp, "", cell_type};
+  Cell cell3{"a", "a", "a", timestamp, "", cell_type};
+  Cell cell4{"a", "a", "", timestamp, "a", cell_type};
+  Cell cell5{"a", "a", "", timestamp, "a", CellType::DELETE};
+  Cell cell6{"aaaaaa", "a", "", timestamp, "a", cell_type};
+
+  LOG(INFO) << empty.EstimatedSize();
+  LOG(INFO) << cell1.EstimatedSize();
+
+  EXPECT_TRUE(empty.EstimatedSize() > sizeof(Cell));
+  EXPECT_TRUE(cell1.EstimatedSize() > empty.EstimatedSize());
+  EXPECT_EQ(cell1.EstimatedSize(), cell2.EstimatedSize());
+  EXPECT_EQ(cell2.EstimatedSize(), cell3.EstimatedSize());
+  EXPECT_EQ(cell3.EstimatedSize(), cell4.EstimatedSize());
+  EXPECT_EQ(cell4.EstimatedSize(), cell5.EstimatedSize());
+  EXPECT_TRUE(cell6.EstimatedSize() > cell1.EstimatedSize());
+}
diff --git a/hbase-native-client/core/cell.cc b/hbase-native-client/core/cell.cc
index 24788ab..e475d49 100644
--- a/hbase-native-client/core/cell.cc
+++ b/hbase-native-client/core/cell.cc
@@ -94,21 +94,30 @@ std::string Cell::DebugString() const {
 
 const char *Cell::TypeToString(CellType type) {
   switch (type) {
-    case MINIMUM:
+    case CellType::MINIMUM:
       return "MINIMUM";
-    case PUT:
+    case CellType::PUT:
       return "PUT";
-    case DELETE:
+    case CellType::DELETE:
       return "DELETE";
-    case DELETE_COLUMN:
+    case CellType::DELETE_COLUMN:
       return "DELETE_COLUMN";
-    case DELETE_FAMILY:
+    case CellType::DELETE_FAMILY:
       return "DELETE_FAMILY";
-    case MAXIMUM:
+    case CellType::MAXIMUM:
       return "MAXIMUM";
     default:
       return "UNKNOWN";
   }
 }
 
+size_t Cell::EstimatedSize() const {
+  size_t s = sizeof(Cell);
+  s += row_.capacity();
+  s += family_.capacity();
+  s += qualifier_.capacity();
+  s += value_.capacity();
+  return s;
+}
+
 } /* namespace hbase */
diff --git a/hbase-native-client/core/cell.h b/hbase-native-client/core/cell.h
index acedd96..7a62a9b 100644
--- a/hbase-native-client/core/cell.h
+++ b/hbase-native-client/core/cell.h
@@ -24,7 +24,7 @@
 
 namespace hbase {
 
-enum CellType {
+enum class CellType {
   MINIMUM = 0,
   PUT = 4,
   DELETE = 8,
@@ -49,6 +49,9 @@ class Cell {
   CellType Type() const;
   int64_t SequenceId() const;
   std::string DebugString() const;
+  /** Returns estimated size of the Cell object including deep heap space usage
+    * of its data. Notice that this is a very rough estimate. */
+  size_t EstimatedSize() const;
 
  private:
   std::string row_;
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index f3c5150..743e928 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -161,23 +161,23 @@ TEST_F(ClientTest, PutGetDelete) {
   table->Delete(hbase::Delete{row}.AddColumn("d", "1"));
   result = table->Get(get);
   ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
-  ASSERT_TRUE(result->Value("d", "1") == nullptr) << "Column 1 should be gone";
+  ASSERT_FALSE(result->Value("d", "1")) << "Column 1 should be gone";
   EXPECT_EQ(valExtra, *(result->Value("d", "extra")));
 
   // delete cell from column "extra" with timestamp tsExtra
   table->Delete(hbase::Delete{row}.AddColumn("d", "extra", tsExtra));
   result = table->Get(get);
   ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
-  ASSERT_TRUE(result->Value("d", "1") == nullptr) << "Column 1 should be gone";
-  ASSERT_TRUE(result->Value("d", "extra") != nullptr) << "Column extra should have value";
+  ASSERT_FALSE(result->Value("d", "1")) << "Column 1 should be gone";
+  ASSERT_TRUE(result->Value("d", "extra")) << "Column extra should have value";
   EXPECT_EQ(valExt, *(result->Value("d", "ext"))) << "Column ext should have value";
 
   // delete all cells from "extra" column
   table->Delete(hbase::Delete{row}.AddColumns("d", "extra"));
   result = table->Get(get);
   ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
-  ASSERT_TRUE(result->Value("d", "1") == nullptr) << "Column 1 should be gone";
-  ASSERT_TRUE(result->Value("d", "extra") == nullptr) << "Column extra should be gone";
+  ASSERT_FALSE(result->Value("d", "1")) << "Column 1 should be gone";
+  ASSERT_FALSE(result->Value("d", "extra")) << "Column extra should be gone";
   EXPECT_EQ(valExt, *(result->Value("d", "ext"))) << "Column ext should have value";
 
   // Delete the row and verify that subsequent Get returns nothing
@@ -249,6 +249,7 @@ TEST_F(ClientTest, PutGet) {
   table->Close();
   client.Close();
 }
+
 TEST_F(ClientTest, GetForNonExistentTable) {
   // Create TableName and Row to be fetched from HBase
   auto tn = folly::to<hbase::pb::TableName>("t_not_exists");
@@ -377,7 +378,7 @@ TEST_F(ClientTest, MultiGets) {
     ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
                                         << " must not be empty";
     EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
-    EXPECT_EQ("value" + std::to_string(i), *results[i]->Value("d", std::to_string(i)).get());
+    EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
   }
   // We are inserting test2 twice so the below test should pass
   ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must not be empty";
diff --git a/hbase-native-client/core/delete-test.cc b/hbase-native-client/core/delete-test.cc
index 19e844b..ec1e3a9 100644
--- a/hbase-native-client/core/delete-test.cc
+++ b/hbase-native-client/core/delete-test.cc
@@ -19,8 +19,8 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "core/mutation.h"
 #include "core/delete.h"
+#include "core/mutation.h"
 #include "utils/time-util.h"
 
 using hbase::Delete;
diff --git a/hbase-native-client/core/delete.cc b/hbase-native-client/core/delete.cc
index b44971f..5f48782 100644
--- a/hbase-native-client/core/delete.cc
+++ b/hbase-native-client/core/delete.cc
@@ -47,14 +47,14 @@ Delete& Delete::AddColumn(const std::string& family, const std::string& qualifie
  *  @param timestamp version timestamp
  */
 Delete& Delete::AddColumn(const std::string& family, const std::string& qualifier,
-        int64_t timestamp) {
+                          int64_t timestamp) {
   if (timestamp < 0) {
     throw std::runtime_error("Timestamp cannot be negative. ts=" +
                              folly::to<std::string>(timestamp));
   }
 
-  return Add(std::make_unique<Cell>(row_, family, qualifier, timestamp, "",
-          hbase::CellType::DELETE));
+  return Add(
+      std::make_unique<Cell>(row_, family, qualifier, timestamp, "", hbase::CellType::DELETE));
 }
 /**
  * Delete all versions of the specified column.
@@ -62,7 +62,7 @@ Delete& Delete::AddColumn(const std::string& family, const std::string& qualifie
  * @param qualifier column qualifier
  */
 Delete& Delete::AddColumns(const std::string& family, const std::string& qualifier) {
-    return AddColumns(family, qualifier, timestamp_);
+  return AddColumns(family, qualifier, timestamp_);
 }
 /**
  * Delete all versions of the specified column with a timestamp less than
@@ -72,14 +72,14 @@ Delete& Delete::AddColumns(const std::string& family, const std::string& qualifi
  * @param timestamp maximum version timestamp
  */
 Delete& Delete::AddColumns(const std::string& family, const std::string& qualifier,
-        int64_t timestamp) {
-    if (timestamp < 0) {
-      throw std::runtime_error("Timestamp cannot be negative. ts=" +
-                               folly::to<std::string>(timestamp));
-    }
+                           int64_t timestamp) {
+  if (timestamp < 0) {
+    throw std::runtime_error("Timestamp cannot be negative. ts=" +
+                             folly::to<std::string>(timestamp));
+  }
 
-    return Add(std::make_unique<Cell>(row_, family, qualifier, timestamp, "",
-            hbase::CellType::DELETE_COLUMN));
+  return Add(std::make_unique<Cell>(row_, family, qualifier, timestamp, "",
+                                    hbase::CellType::DELETE_COLUMN));
 }
 /**
  * Delete all versions of all columns of the specified family.
@@ -88,9 +88,7 @@ Delete& Delete::AddColumns(const std::string& family, const std::string& qualifi
  * specified family.
  * @param family family name
  */
-Delete& Delete::AddFamily(const std::string& family) {
-    return AddFamily(family, timestamp_);
-}
+Delete& Delete::AddFamily(const std::string& family) { return AddFamily(family, timestamp_); }
 
 /**
  * Delete all columns of the specified family with a timestamp less than
@@ -102,14 +100,14 @@ Delete& Delete::AddFamily(const std::string& family) {
  * @param timestamp maximum version timestamp
  */
 Delete& Delete::AddFamily(const std::string& family, int64_t timestamp) {
-    const auto &it = family_map_.find(family);
-    if (family_map_.end() != it) {
-        it->second.clear();
-    } else {
-        family_map_[family];
-    }
-    return Add(std::make_unique<Cell>(row_, family, "", timestamp, "",
-            hbase::CellType::DELETE_FAMILY));
+  const auto& it = family_map_.find(family);
+  if (family_map_.end() != it) {
+    it->second.clear();
+  } else {
+    family_map_[family];
+  }
+  return Add(
+      std::make_unique<Cell>(row_, family, "", timestamp, "", hbase::CellType::DELETE_FAMILY));
 }
 /**
  * Delete all columns of the specified family with a timestamp equal to
@@ -118,8 +116,8 @@ Delete& Delete::AddFamily(const std::string& family, int64_t timestamp) {
  * @param timestamp version timestamp
  */
 Delete& Delete::AddFamilyVersion(const std::string& family, int64_t timestamp) {
-    return Add(std::make_unique<Cell>(row_, family, "", timestamp, "",
-            hbase::CellType::DELETE_FAMILY_VERSION));
+  return Add(std::make_unique<Cell>(row_, family, "", timestamp, "",
+                                    hbase::CellType::DELETE_FAMILY_VERSION));
 }
 Delete& Delete::Add(std::unique_ptr<Cell> cell) {
   if (cell->Row() != row_) {
diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc
index 52b2ec5..bc0d446 100644
--- a/hbase-native-client/core/get.cc
+++ b/hbase-native-client/core/get.cc
@@ -28,7 +28,7 @@ Get::~Get() {}
 
 Get::Get(const std::string &row) : Row(row) {}
 
-Get::Get(const Get &get) {
+Get::Get(const Get &get) : Query(get) {
   row_ = get.row_;
   max_versions_ = get.max_versions_;
   cache_blocks_ = get.cache_blocks_;
@@ -39,6 +39,7 @@ Get::Get(const Get &get) {
 }
 
 Get &Get::operator=(const Get &get) {
+  Query::operator=(get);
   row_ = get.row_;
   max_versions_ = get.max_versions_;
   cache_blocks_ = get.cache_blocks_;
diff --git a/hbase-native-client/core/hbase-configuration-loader.h b/hbase-native-client/core/hbase-configuration-loader.h
index a1c1d3f..95b2541 100644
--- a/hbase-native-client/core/hbase-configuration-loader.h
+++ b/hbase-native-client/core/hbase-configuration-loader.h
@@ -24,15 +24,12 @@
 #include <vector>
 
 #include <boost/optional.hpp>
-#include <experimental/optional>
 
 #include "core/configuration.h"
+#include "utils/optional.h"
 
 namespace hbase {
 
-template <class T>
-using optional = std::experimental::optional<T>;
-
 class HBaseConfigurationLoader {
  public:
   HBaseConfigurationLoader();
diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h
index 9bb89b9..33f552b 100644
--- a/hbase-native-client/core/hbase-rpc-controller.h
+++ b/hbase-native-client/core/hbase-rpc-controller.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
 #include <google/protobuf/service.h>
 #include <chrono>
 #include <string>
@@ -37,6 +38,10 @@ class HBaseRpcController : public google::protobuf::RpcController {
 
   bool Failed() const override { return false; }
 
+  folly::exception_wrapper exception() { return exception_; }
+
+  void set_exception(const folly::exception_wrapper& exception) { exception_ = exception; }
+
   std::string ErrorText() const override { return ""; }
 
   void StartCancel() override {}
@@ -46,6 +51,9 @@ class HBaseRpcController : public google::protobuf::RpcController {
   bool IsCanceled() const override { return false; }
 
   void NotifyOnCancel(google::protobuf::Closure* callback) override {}
+
+ private:
+  folly::exception_wrapper exception_;
 };
 
 } /* namespace hbase */
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 0577bfc..8efecc8 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -92,19 +92,12 @@ std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) {
     throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" +
                              std::to_string(results.size()));
   }
-
   auto result = *results[0];
-  // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO
-
-  std::shared_ptr<std::string> region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
-  std::shared_ptr<std::string> server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
 
-  if (region_info_str == nullptr) {
-    throw std::runtime_error("regioninfo column null for location");
-  }
-  if (server_str == nullptr) {
-    throw std::runtime_error("server column null for location");
-  }
+  auto region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
+  auto server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
+  CHECK(region_info_str);
+  CHECK(server_str);
 
   auto row = result.Row();
   auto region_info = folly::to<RegionInfo>(*region_info_str);
diff --git a/hbase-native-client/core/query.h b/hbase-native-client/core/query.h
index b706303..301f448 100644
--- a/hbase-native-client/core/query.h
+++ b/hbase-native-client/core/query.h
@@ -19,6 +19,8 @@
 
 #pragma once
 
+#include <memory>
+
 #include "core/filter.h"
 
 namespace hbase {
@@ -28,14 +30,25 @@ namespace hbase {
  */
 class Query {
  public:
+  Query() = default;
+  Query(const Query &query) {
+    // filter can be a custom subclass of Filter, so we do not do a deep copy here.
+    filter_ = query.filter_;
+  }
+
+  Query &operator=(const Query &query) {
+    filter_ = query.filter_;
+    return *this;
+  }
+
   virtual ~Query() {}
 
-  void SetFilter(std::unique_ptr<Filter> filter) { filter_ = std::move(filter); }
+  void SetFilter(std::shared_ptr<Filter> filter) { filter_ = filter; }
 
-  const std::unique_ptr<Filter>& filter() const { return filter_; }
+  const std::shared_ptr<Filter> filter() const { return filter_; }
 
  protected:
-  std::unique_ptr<Filter> filter_ = nullptr;
+  std::shared_ptr<Filter> filter_ = nullptr;
 };
 
 }  // namespace hbase
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index f71fbba..998e2f1 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -111,9 +111,10 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) {
 folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
   auto caller =
       CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout())
-          ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller,
-                             std::shared_ptr<hbase::RegionLocation> loc,
-                             std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
+          ->action([=, &del](
+                       std::shared_ptr<hbase::HBaseRpcController> controller,
+                       std::shared_ptr<hbase::RegionLocation> loc,
+                       std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
             return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>(
                 rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest,
                 [](const Response& r) -> folly::Unit { return folly::unit; });
@@ -143,4 +144,24 @@ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::B
 
   return caller->Call().then([caller](auto r) { return r; });
 }
+
+void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) {
+  auto scanner = AsyncClientScanner::Create(
+      connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(),
+      connection_conf_->max_retries(), connection_conf_->scan_timeout(),
+      connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count());
+  scanner->Start();
+}
+
+std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) {
+  // always create a new scan object as we may reset the start row later.
+  auto new_scan = std::make_shared<hbase::Scan>(scan);
+  if (new_scan->Caching() <= 0) {
+    new_scan->SetCaching(default_scanner_caching_);
+  }
+  if (new_scan->MaxResultSize() <= 0) {
+    new_scan->SetMaxResultSize(default_scanner_max_result_size_);
+  }
+  return new_scan;
+}
 }  // namespace hbase
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index c8e9f2f..8c40dae 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -24,7 +24,9 @@
 #include <memory>
 #include <string>
 #include <vector>
+
 #include "core/async-batch-rpc-retrying-caller.h"
+#include "core/async-client-scanner.h"
 #include "core/async-connection.h"
 #include "core/async-rpc-retrying-caller-factory.h"
 #include "core/async-rpc-retrying-caller.h"
@@ -34,6 +36,7 @@
 #include "core/increment.h"
 #include "core/put.h"
 #include "core/result.h"
+#include "core/scan.h"
 
 namespace hbase {
 
@@ -48,14 +51,22 @@ class RawAsyncTable {
       : connection_(connection),
         connection_conf_(connection->connection_conf()),
         table_name_(table_name),
-        rpc_client_(connection->rpc_client()) {}
+        rpc_client_(connection->rpc_client()) {
+    default_scanner_caching_ = connection_conf_->scanner_caching();
+    default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size();
+  }
   virtual ~RawAsyncTable() = default;
 
   folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get);
 
-	folly::Future<folly::Unit> Delete(const hbase::Delete& del);
-	folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+  folly::Future<folly::Unit> Delete(const hbase::Delete& del);
+
+  folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+
   folly::Future<folly::Unit> Put(const hbase::Put& put);
+
+  void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer);
+
   void Close() {}
 
   folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get(
@@ -69,6 +80,8 @@ class RawAsyncTable {
   std::shared_ptr<ConnectionConfiguration> connection_conf_;
   std::shared_ptr<pb::TableName> table_name_;
   std::shared_ptr<RpcClient> rpc_client_;
+  int32_t default_scanner_caching_;
+  int64_t default_scanner_max_result_size_;
 
   /* Methods */
   template <typename REQ, typename PREQ, typename PRESP, typename RESP>
@@ -81,5 +94,7 @@ class RawAsyncTable {
   template <typename RESP>
   std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(
       std::string row, std::chrono::nanoseconds rpc_timeout);
+
+  std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan);
 };
 }  // namespace hbase
diff --git a/hbase-native-client/core/raw-scan-result-consumer.h b/hbase-native-client/core/raw-scan-result-consumer.h
new file mode 100644
index 0000000..b7c3c48
--- /dev/null
+++ b/hbase-native-client/core/raw-scan-result-consumer.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Logging.h>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed };
+
+enum class ScanResumerState { kInitialized, kSuspended, kResumed };
+
+/**
+ * Used to resume a scan.
+ */
+class ScanResumer {
+ public:
+  virtual ~ScanResumer() = default;
+
+  /**
+   * Resume the scan. You are free to call it multiple time but only the first call will take
+   * effect.
+   */
+  virtual void Resume() = 0;
+};
+
+/**
+ * Used to suspend or stop a scan.
+ * <p>
+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
+ * IllegalStateException will be thrown if you call them at other places.
+ * <p>
+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are
+ * free to not call them both), and the methods are not reentrant. A IllegalStateException will be
+ * thrown if you have already called one of the methods.
+ */
+class ScanController {
+ public:
+  virtual ~ScanController() = default;
+
+  /**
+   * Suspend the scan.
+   * <p>
+   * This means we will stop fetching data in background, i.e., will not call onNext any more
+   * before you resume the scan.
+   * @return A resumer used to resume the scan later.
+   */
+  virtual std::shared_ptr<ScanResumer> Suspend() = 0;
+
+  /**
+   * Terminate the scan.
+   * <p>
+   * This is useful when you have got enough results and want to stop the scan in onNext method,
+   * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+   */
+  virtual void Terminate() = 0;
+};
+
+/**
+ * Receives {@link Result} for an asynchronous scan.
+ * <p>
+ * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
+ * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
+ * HBase in background while you process the returned data, you need to move the processing work to
+ * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
+ * consuming tasks in all methods below unless you know what you are doing.
+ */
+class RawScanResultConsumer {
+ public:
+  virtual ~RawScanResultConsumer() = default;
+
+  /**
+   * Indicate that we have receive some data.
+   * @param results the data fetched from HBase service.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within scope of onNext method. You can only call its method in
+   *          onNext, do NOT store it and call it later outside onNext.
+   */
+  virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results,
+                      std::shared_ptr<ScanController> controller) {}
+
+  /**
+   * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+   * onNext.
+   * <p>
+   * This method give you a chance to terminate a slow scan operation.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within the scope of onHeartbeat method. You can only call its
+   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+   */
+  virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {}
+
+  /**
+   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+   * <p>
+   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+   */
+  virtual void OnError(const folly::exception_wrapper &error) {}
+
+  /**
+   * Indicate that the scan operation is completed normally.
+   */
+  virtual void OnComplete() {}
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index 4087d94..d5d9d67 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -26,7 +26,7 @@
 
 namespace hbase {
 
-enum RegionLocateType { kBefore, kCurrent, kAfter };
+enum class RegionLocateType { kBefore, kCurrent, kAfter };
 
 /**
  * @brief class to hold where a region is located.
diff --git a/hbase-native-client/core/request-converter-test.cc b/hbase-native-client/core/request-converter-test.cc
index c01f16c..6c07a19 100644
--- a/hbase-native-client/core/request-converter-test.cc
+++ b/hbase-native-client/core/request-converter-test.cc
@@ -83,7 +83,6 @@ TEST(RequestConverter, ToScan) {
   scan.SetReversed(true);
   scan.SetStartRow(start_row);
   scan.SetStopRow(stop_row);
-  scan.SetSmall(true);
   scan.SetCaching(3);
   scan.SetConsistency(hbase::pb::Consistency::TIMELINE);
   scan.SetCacheBlocks(true);
@@ -105,7 +104,7 @@ TEST(RequestConverter, ToScan) {
   EXPECT_TRUE(msg->scan().reversed());
   EXPECT_EQ(msg->scan().start_row(), start_row);
   EXPECT_EQ(msg->scan().stop_row(), stop_row);
-  EXPECT_TRUE(msg->scan().small());
+  EXPECT_FALSE(msg->scan().small());
   EXPECT_EQ(msg->scan().caching(), 3);
   EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE);
   EXPECT_TRUE(msg->scan().cache_blocks());
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index a1e63fe..6eb2f04 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -53,19 +53,11 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
   return pb_req;
 }
 
-std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
-                                                         const std::string &region_name) {
-  auto pb_req = Request::scan();
-
-  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
-
-  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
-
-  auto pb_scan = pb_msg->mutable_scan();
+std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) {
+  auto pb_scan = std::make_unique<hbase::pb::Scan>();
   pb_scan->set_max_versions(scan.MaxVersions());
   pb_scan->set_cache_blocks(scan.CacheBlocks());
   pb_scan->set_reversed(scan.IsReversed());
-  pb_scan->set_small(scan.IsSmall());
   pb_scan->set_caching(scan.Caching());
   pb_scan->set_start_row(scan.StartRow());
   pb_scan->set_stop_row(scan.StopRow());
@@ -94,12 +86,78 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
     pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release());
   }
 
-  // TODO We will change this later.
+  return std::move(pb_scan);
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+                                                         const std::string &region_name) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_scan(ToScan(scan).release());
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+                                                         const std::string &region_name,
+                                                         int32_t num_rows, bool close_scanner) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_scan(ToScan(scan).release());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                         bool close_scanner) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+  pb_msg->set_scanner_id(scanner_id);
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                         bool close_scanner,
+                                                         int64_t next_call_seq_id, bool renew) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+  pb_msg->set_scanner_id(scanner_id);
+  pb_msg->set_next_call_seq(next_call_seq_id);
+
+  SetCommonScanRequestFields(pb_msg, renew);
+  return pb_req;
+}
+
+void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg,
+                                                  bool renew) {
+  // TODO We will change these later when we implement partial results and heartbeats, etc
   pb_msg->set_client_handles_partials(false);
   pb_msg->set_client_handles_heartbeats(false);
   pb_msg->set_track_scan_metrics(false);
-
-  return pb_req;
+  pb_msg->set_renew(renew);
+  // TODO: set scan limit
 }
 
 std::unique_ptr<Request> RequestConverter::ToMultiRequest(
@@ -123,7 +181,6 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
     }
   }
 
-  VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString();
   return pb_req;
 }
 
@@ -190,13 +247,13 @@ std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType t
 
 DeleteType RequestConverter::ToDeleteType(const CellType type) {
   switch (type) {
-    case DELETE:
+    case CellType::DELETE:
       return pb::MutationProto_DeleteType_DELETE_ONE_VERSION;
-    case DELETE_COLUMN:
+    case CellType::DELETE_COLUMN:
       return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS;
-    case DELETE_FAMILY:
+    case CellType::DELETE_FAMILY:
       return pb::MutationProto_DeleteType_DELETE_FAMILY;
-    case DELETE_FAMILY_VERSION:
+    case CellType::DELETE_FAMILY_VERSION:
       return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION;
     default:
       throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type));
@@ -216,12 +273,11 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put,
   pb_msg->set_allocated_mutation(
       ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release());
 
-  VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
   return pb_req;
 }
 
 std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del,
-                                                           const std::string &region_name) {
+                                                                 const std::string &region_name) {
   auto pb_req = Request::mutate();
   auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
   RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index 6d57161..c807f45 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -66,9 +66,20 @@ class RequestConverter {
    */
   static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string &region_name);
 
+  static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string &region_name,
+                                                int32_t num_rows, bool close_scanner);
+
+  static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                bool close_scanner);
+
+  static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                bool close_scanner, int64_t next_call_seq_id,
+                                                bool renew);
+
   static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion &region_requests);
 
-  static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,const std::string &region_name);
+  static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,
+                                                        const std::string &region_name);
 
   static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string &region_name);
 
@@ -91,8 +102,10 @@ class RequestConverter {
    */
   static void SetRegion(const std::string &region_name, pb::RegionSpecifier *region_specifier);
   static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get);
+  static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan);
   static DeleteType ToDeleteType(const CellType type);
   static bool IsDelete(const CellType type);
+  static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew);
 };
 
 } /* namespace hbase */
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 94b7875..d2d719b 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -54,7 +54,7 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& re
 }
 
 std::shared_ptr<Result> ResponseConverter::ToResult(
-    const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
+    const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) {
   std::vector<std::shared_ptr<Cell>> vcells;
   for (auto cell : result.cell()) {
     std::shared_ptr<Cell> pcell =
@@ -82,34 +82,38 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
 
 std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
   auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
-  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
-  int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
-                                                   : scan_resp->results_size();
+  return FromScanResponse(scan_resp, resp.cell_scanner());
+}
+
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
+    const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) {
+  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
+          << " cell_scanner:" << (cell_scanner == nullptr);
+  int num_results =
+      cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();
 
   std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)};
   for (int i = 0; i < num_results; i++) {
-    if (resp.cell_scanner() != nullptr) {
+    if (cell_scanner != nullptr) {
       // Cells are out in cellblocks.  Group them up again as Results.  How many to read at a
       // time will be found in getCellsLength -- length here is how many Cells in the i'th Result
       int num_cells = scan_resp->cells_per_result(i);
 
       std::vector<std::shared_ptr<Cell>> vcells;
-      while (resp.cell_scanner()->Advance()) {
-        vcells.push_back(resp.cell_scanner()->Current());
-      }
-      // TODO: check associated cell count?
-
-      if (vcells.size() != num_cells) {
-        std::string msg = "Results sent from server=" + std::to_string(num_results) +
-                          ". But only got " + std::to_string(i) +
-                          " results completely at client. Resetting the scanner to scan again.";
-        LOG(ERROR) << msg;
-        throw std::runtime_error(msg);
+      for (int j = 0; j < num_cells; j++) {
+        if (!cell_scanner->Advance()) {
+          std::string msg = "Results sent from server=" + std::to_string(num_results) +
+                            ". But only got " + std::to_string(i) +
+                            " results completely at client. Resetting the scanner to scan again.";
+          LOG(ERROR) << msg;
+          throw std::runtime_error(msg);
+        }
+        vcells.push_back(cell_scanner->Current());
       }
       // TODO: handle partial results per Result by checking partial_flag_per_result
       results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false);
     } else {
-      results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
+      results[i] = ToResult(scan_resp->results(i), cell_scanner);
     }
   }
 
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 0fdde89..b518d1c 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -39,7 +39,7 @@ class ResponseConverter {
   ~ResponseConverter();
 
   static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result,
-                                          const std::unique_ptr<CellScanner>& cell_scanner);
+                                          const std::shared_ptr<CellScanner> cell_scanner);
 
   /**
    * @brief Returns a Result object created by PB Message in passed Response object.
@@ -51,6 +51,9 @@ class ResponseConverter {
 
   static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp);
 
+  static std::vector<std::shared_ptr<Result>> FromScanResponse(
+      const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner);
+
   static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
                                                           const Response& resp);
 
diff --git a/hbase-native-client/core/query.h b/hbase-native-client/core/result-scanner.h
similarity index 66%
copy from hbase-native-client/core/query.h
copy to hbase-native-client/core/result-scanner.h
index b706303..9460521 100644
--- a/hbase-native-client/core/query.h
+++ b/hbase-native-client/core/result-scanner.h
@@ -19,23 +19,29 @@
 
 #pragma once
 
-#include "core/filter.h"
+#include <functional>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
 
 namespace hbase {
 
 /**
- * Base class for read RPC calls (Get / Scan).
+ * Interface for client-side scanning. Use Table to obtain instances.
  */
-class Query {
- public:
-  virtual ~Query() {}
+class ResultScanner {
+  // TODO: should we implement forward iterators?
 
-  void SetFilter(std::unique_ptr<Filter> filter) { filter_ = std::move(filter); }
+ public:
+  virtual ~ResultScanner() {}
 
-  const std::unique_ptr<Filter>& filter() const { return filter_; }
+  virtual void Close() = 0;
 
- protected:
-  std::unique_ptr<Filter> filter_ = nullptr;
+  virtual std::shared_ptr<Result> Next() = 0;
 };
-
-}  // namespace hbase
+} /* namespace hbase */
diff --git a/hbase-native-client/core/result-test.cc b/hbase-native-client/core/result-test.cc
index 520f4b9..3299ffc 100644
--- a/hbase-native-client/core/result-test.cc
+++ b/hbase-native-client/core/result-test.cc
@@ -17,6 +17,7 @@
  *
  */
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 #include <limits>
 #include <memory>
@@ -71,7 +72,7 @@ void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) {
       }
       default: {
         cells.push_back(std::make_shared<Cell>(
-            row, family, column, std::numeric_limits<long>::max(), value, CellType::PUT));
+            row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT));
       }
     }
   }
@@ -255,7 +256,7 @@ TEST(Result, FilledResult) {
             break;
           }
           default: {
-            EXPECT_EQ(std::numeric_limits<long>::max(), version_map.first);
+            EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first);
             EXPECT_EQ(value, version_map.second);
           }
         }
@@ -297,3 +298,24 @@ TEST(Result, FilledResult) {
     EXPECT_EQ("value-9", qual_val_map.second);
   }
 }
+
+TEST(Result, ResultEstimatedSize) {
+  CellType cell_type = CellType::PUT;
+  int64_t timestamp = std::numeric_limits<int64_t>::max();
+  std::vector<std::shared_ptr<Cell> > cells;
+  Result empty(cells, true, false, false);
+
+  EXPECT_EQ(empty.EstimatedSize(), sizeof(Result));
+
+  cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+  Result result1(cells, true, false, false);
+  EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize());
+
+  cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+  Result result2(cells, true, false, false);
+  EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize());
+
+  LOG(INFO) << empty.EstimatedSize();
+  LOG(INFO) << result1.EstimatedSize();
+  LOG(INFO) << result2.EstimatedSize();
+}
diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc
index 9d9ddb3..44b4c86 100644
--- a/hbase-native-client/core/result.cc
+++ b/hbase-native-client/core/result.cc
@@ -25,13 +25,7 @@ Result::~Result() {}
 
 Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale,
                bool partial)
-    : exists_(exists), stale_(stale), partial_(partial) {
-  for (const auto &cell : cells) {
-    cells_.push_back(cell);
-    // We create the map when cells are added. unlike java where map is created
-    // when result.getMap() is called
-    result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
-  }
+    : exists_(exists), stale_(stale), partial_(partial), cells_(cells) {
   row_ = (cells_.size() == 0 ? "" : cells_[0]->Row());
 }
 
@@ -43,10 +37,10 @@ Result::Result(const Result &result) {
   if (!result.cells_.empty()) {
     for (const auto &cell : result.cells_) {
       cells_.push_back(cell);
-      result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
     }
   }
 }
+
 const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; }
 
 std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family,
@@ -74,13 +68,12 @@ const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family,
   return nullptr;
 }
 
-std::shared_ptr<std::string> Result::Value(const std::string &family,
-                                           const std::string &qualifier) const {
+optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const {
   std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier));
   if (latest_cell.get()) {
-    return std::make_shared<std::string>(latest_cell->Value());
+    return optional<std::string>(latest_cell->Value());
   }
-  return nullptr;
+  return optional<std::string>();
 }
 
 bool Result::IsEmpty() const { return cells_.empty(); }
@@ -89,24 +82,33 @@ const std::string &Result::Row() const { return row_; }
 
 int Result::Size() const { return cells_.size(); }
 
-const ResultMap &Result::Map() const { return result_map_; }
+ResultMap Result::Map() const {
+  ResultMap result_map;
+  for (const auto &cell : cells_) {
+    result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
+  }
+  return result_map;
+}
 
-const std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
+std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
   std::map<std::string, std::string> family_map;
   if (!IsEmpty()) {
-    for (auto itr = result_map_.begin(); itr != result_map_.end(); ++itr) {
-      if (family == itr->first) {
-        for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
-          for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
-            // We break after inserting the first value. Result.java takes only
-            // the first value
-            family_map[qitr->first] = vitr->second;
-            break;
-          }
-        }
+    auto result_map = Map();
+    auto itr = result_map.find(family);
+    if (itr == result_map.end()) {
+      return family_map;
+    }
+
+    for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
+      for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
+        // We break after inserting the first value. Result.java takes only
+        // the first value
+        family_map[qitr->first] = vitr->second;
+        break;
       }
     }
   }
+
   return family_map;
 }
 
@@ -131,4 +133,14 @@ std::string Result::DebugString() const {
   return ret;
 }
 
+size_t Result::EstimatedSize() const {
+  size_t s = sizeof(Result);
+  s += row_.capacity();
+  for (const auto c : cells_) {
+    s += sizeof(std::shared_ptr<Cell>);
+    s + c->EstimatedSize();
+  }
+  return s;
+}
+
 } /* namespace hbase */
diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h
index 627d161..f18071b 100644
--- a/hbase-native-client/core/result.h
+++ b/hbase-native-client/core/result.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "core/cell.h"
+#include "utils/optional.h"
 
 namespace hbase {
 
@@ -79,7 +80,7 @@ class Result {
    * @param family - column family
    * @param qualifier - column qualifier
    */
-  std::shared_ptr<std::string> Value(const std::string &family, const std::string &qualifier) const;
+  optional<std::string> Value(const std::string &family, const std::string &qualifier) const;
 
   /**
    * @brief Returns if the underlying Cell vector is empty or not
@@ -104,23 +105,32 @@ class Result {
    * All other map returning methods make use of this map internally
    * The Map is created when the Result instance is created
    */
-  const ResultMap &Map() const;
+  ResultMap Map() const;
 
   /**
    * @brief Map of qualifiers to values.
    * Returns a Map of the form: Map<qualifier,value>
    * @param family - column family to get
    */
-  const std::map<std::string, std::string> FamilyMap(const std::string &family) const;
+  std::map<std::string, std::string> FamilyMap(const std::string &family) const;
 
   std::string DebugString() const;
 
+  bool Exists() const { return exists_; }
+
+  bool Stale() const { return stale_; }
+
+  bool Partial() const { return partial_; }
+
+  /** Returns estimated size of the Result object including deep heap space usage
+   * of its Cells and data. Notice that this is a very rough estimate. */
+  size_t EstimatedSize() const;
+
  private:
   bool exists_ = false;
   bool stale_ = false;
   bool partial_ = false;
   std::string row_ = "";
   std::vector<std::shared_ptr<Cell> > cells_;
-  ResultMap result_map_;
 };
 } /* namespace hbase */
diff --git a/hbase-native-client/core/scan-result-cache-test.cc b/hbase-native-client/core/scan-result-cache-test.cc
new file mode 100644
index 0000000..0bf83ce
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache-test.cc
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+
+using hbase::ScanResultCache;
+using hbase::Result;
+using hbase::Cell;
+using hbase::CellType;
+
+using ResultVector = std::vector<std::shared_ptr<Result>>;
+
+std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family,
+                                 const std::string &column) {
+  auto row = folly::to<std::string>(key);
+  return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row,
+                                CellType::PUT);
+}
+
+std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) {
+  return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial);
+}
+
+TEST(ScanResultCacheTest, NoPartial) {
+  ScanResultCache cache;
+  ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false));
+  ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true));
+  int32_t count = 10;
+  ResultVector results{};
+  for (int32_t i = 0; i < count; i++) {
+    results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false));
+  }
+  ASSERT_EQ(results, cache.AddAndGet(results, false));
+}
+
+TEST(ScanResultCacheTest, Combine1) {
+  ScanResultCache cache;
+  auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true);
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+  auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false);
+  ASSERT_EQ(1L, results.size());
+  ASSERT_EQ(prev_result, results[0]);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size());
+
+  results = cache.AddAndGet(ResultVector{}, false);
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(3, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+}
+
+TEST(ScanResultCacheTest, Combine2) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+  auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{next_result1}, false);
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(3, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+
+  results = cache.AddAndGet(ResultVector{next_to_next_result1}, false);
+  ASSERT_EQ(2, results.size());
+  ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(1, results[0]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row()));
+  ASSERT_EQ(1, results[1]->Cells().size());
+  ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, Combine3) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false);
+  auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false);
+
+  ASSERT_EQ(2, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row()));
+  ASSERT_EQ(1, results[1]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1")));
+
+  results = cache.AddAndGet(ResultVector{}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(1, results[0]->Cells().size());
+  ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+}
+
+TEST(ScanResultCacheTest, Combine4) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false);
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+  auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+
+  results = cache.AddAndGet(ResultVector{next_result2}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, SizeOf) {
+  std::string e{""};
+  std::string f{"f"};
+  std::string foo{"foo"};
+
+  LOG(INFO) << sizeof(e) << " " << e.capacity();
+  LOG(INFO) << sizeof(f) << " " << f.capacity();
+  LOG(INFO) << sizeof(foo) << " " << foo.capacity();
+}
diff --git a/hbase-native-client/core/scan-result-cache.cc b/hbase-native-client/core/scan-result-cache.cc
new file mode 100644
index 0000000..62a51e0
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/scan-result-cache.h"
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+/**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet(
+    const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) {
+  // If no results were returned it indicates that either we have the all the partial results
+  // necessary to construct the complete result or the server had to send a heartbeat message
+  // to the client to keep the client-server connection alive
+  if (results.empty()) {
+    // If this response was an empty heartbeat message, then we have not exhausted the region
+    // and thus there may be more partials server side that still need to be added to the partial
+    // list before we form the complete Result
+    if (!partial_results_.empty() && !is_hearthbeat) {
+      return UpdateNumberOfCompleteResultsAndReturn(
+          std::vector<std::shared_ptr<Result>>{Combine()});
+    }
+    return std::vector<std::shared_ptr<Result>>{};
+  }
+  // In every RPC response there should be at most a single partial result. Furthermore, if
+  // there is a partial result, it is guaranteed to be in the last position of the array.
+  auto last = results[results.size() - 1];
+  if (last->Partial()) {
+    if (partial_results_.empty()) {
+      partial_results_.push_back(last);
+      std::vector<std::shared_ptr<Result>> new_results;
+      std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results));
+      return UpdateNumberOfCompleteResultsAndReturn(new_results);
+    }
+    // We have only one result and it is partial
+    if (results.size() == 1) {
+      // check if there is a row change
+      if (partial_results_.at(0)->Row() == last->Row()) {
+        partial_results_.push_back(last);
+        return std::vector<std::shared_ptr<Result>>{};
+      }
+      auto complete_result = Combine();
+      partial_results_.push_back(last);
+      return UpdateNumberOfCompleteResultsAndReturn(complete_result);
+    }
+    // We have some complete results
+    auto results_to_return = PrependCombined(results, results.size() - 1);
+    partial_results_.push_back(last);
+    return UpdateNumberOfCompleteResultsAndReturn(results_to_return);
+  }
+  if (!partial_results_.empty()) {
+    return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size()));
+  }
+  return UpdateNumberOfCompleteResultsAndReturn(results);
+}
+
+void ScanResultCache::Clear() { partial_results_.clear(); }
+
+std::shared_ptr<Result> ScanResultCache::CreateCompleteResult(
+    const std::vector<std::shared_ptr<Result>> &partial_results) {
+  if (partial_results.empty()) {
+    return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false);
+  }
+  std::vector<std::shared_ptr<Cell>> cells{};
+  bool stale = false;
+  std::string prev_row = "";
+  std::string current_row = "";
+  size_t i = 0;
+  for (const auto &r : partial_results) {
+    current_row = r->Row();
+    if (i != 0 && prev_row != current_row) {
+      throw new std::runtime_error(
+          "Cannot form complete result. Rows of partial results do not match.");
+    }
+    // Ensure that all Results except the last one are marked as partials. The last result
+    // may not be marked as a partial because Results are only marked as partials when
+    // the scan on the server side must be stopped due to reaching the maxResultSize.
+    // Visualizing it makes it easier to understand:
+    // maxResultSize: 2 cells
+    // (-x-) represents cell number x in a row
+    // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+    // How row1 will be returned by the server as partial Results:
+    // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+    // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+    // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+    if (i != partial_results.size() - 1 && !r->Partial()) {
+      throw new std::runtime_error("Cannot form complete result. Result is missing partial flag.");
+    }
+    prev_row = current_row;
+    stale = stale || r->Stale();
+    for (const auto &c : r->Cells()) {
+      cells.push_back(c);
+    }
+    i++;
+  }
+
+  return std::make_shared<Result>(cells, false, stale, false);
+}
+
+std::shared_ptr<Result> ScanResultCache::Combine() {
+  auto result = CreateCompleteResult(partial_results_);
+  partial_results_.clear();
+  return result;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined(
+    const std::vector<std::shared_ptr<Result>> &results, int length) {
+  if (length == 0) {
+    return std::vector<std::shared_ptr<Result>>{Combine()};
+  }
+  // the last part of a partial result may not be marked as partial so here we need to check if
+  // there is a row change.
+  size_t start;
+  if (partial_results_[0]->Row() == results[0]->Row()) {
+    partial_results_.push_back(results[0]);
+    start = 1;
+    length--;
+  } else {
+    start = 0;
+  }
+  std::vector<std::shared_ptr<Result>> prepend_results{};
+  prepend_results.push_back(Combine());
+  std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results));
+  return prepend_results;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+    const std::shared_ptr<Result> &result) {
+  return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result});
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+    const std::vector<std::shared_ptr<Result>> &results) {
+  num_complete_rows_ += results.size();
+  return results;
+}
+}  // namespace hbase
diff --git a/hbase-native-client/core/scan-result-cache.h b/hbase-native-client/core/scan-result-cache.h
new file mode 100644
index 0000000..5d3d0ab
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Logging.h>
+#include <algorithm>
+#include <chrono>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+class ScanResultCache {
+  // In Java, there are 3 different implementations for this. We are not doing partial results,
+  // or scan batching in native code for now, so this version is simpler and
+  // only deals with giving back complete rows as Result. It is more or less implementation
+  // of CompleteScanResultCache.java
+
+ public:
+  /**
+   * Add the given results to cache and get valid results back.
+   * @param results the results of a scan next. Must not be null.
+   * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+   * @return valid results, never null.
+   */
+  std::vector<std::shared_ptr<Result>> AddAndGet(
+      const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat);
+
+  void Clear();
+
+  int64_t num_complete_rows() const { return num_complete_rows_; }
+
+ private:
+  /**
+     * Forms a single result from the partial results in the partialResults list. This method is
+     * useful for reconstructing partial results on the client side.
+     * @param partial_results list of partial results
+     * @return The complete result that is formed by combining all of the partial results together
+     */
+  static std::shared_ptr<Result> CreateCompleteResult(
+      const std::vector<std::shared_ptr<Result>> &partial_results);
+
+  std::shared_ptr<Result> Combine();
+
+  std::vector<std::shared_ptr<Result>> PrependCombined(
+      const std::vector<std::shared_ptr<Result>> &results, int length);
+
+  std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+      const std::shared_ptr<Result> &result);
+
+  std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+      const std::vector<std::shared_ptr<Result>> &results);
+
+ private:
+  std::vector<std::shared_ptr<Result>> partial_results_;
+  int64_t num_complete_rows_ = 0;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc
index 73fb6df..0ee054c 100644
--- a/hbase-native-client/core/scan-test.cc
+++ b/hbase-native-client/core/scan-test.cc
@@ -17,13 +17,13 @@
  *
  */
 
-#include "core/scan.h"
-
+#include <gtest/gtest.h>
 #include <limits>
 
-#include <gtest/gtest.h>
+#include "core/scan.h"
 
-using namespace hbase;
+using hbase::Get;
+using hbase::Scan;
 
 void CheckFamilies(Scan &scan) {
   EXPECT_EQ(false, scan.HasFamilies());
@@ -74,7 +74,7 @@ void CheckFamilies(Scan &scan) {
   EXPECT_EQ(it, scan.FamilyMap().end());
 }
 
-void CheckFamiliesAfterCopy(Scan &scan) {
+void CheckFamiliesAfterCopy(const Scan &scan) {
   EXPECT_EQ(true, scan.HasFamilies());
   EXPECT_EQ(3, scan.FamilyMap().size());
   int i = 1;
@@ -116,11 +116,6 @@ void ScanMethods(Scan &scan) {
   scan.SetStopRow(stop_row);
   EXPECT_EQ(stop_row, scan.StopRow());
 
-  scan.SetSmall(true);
-  EXPECT_EQ(true, scan.IsSmall());
-  scan.SetSmall(false);
-  EXPECT_EQ(false, scan.IsSmall());
-
   scan.SetCaching(3);
   EXPECT_EQ(3, scan.Caching());
 
diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc
index 5f315ec..6dcc51b 100644
--- a/hbase-native-client/core/scan.cc
+++ b/hbase-native-client/core/scan.cc
@@ -38,7 +38,7 @@ Scan::Scan(const std::string &start_row, const std::string &stop_row)
   CheckRow(stop_row_);
 }
 
-Scan::Scan(const Scan &scan) {
+Scan::Scan(const Scan &scan) : Query(scan) {
   start_row_ = scan.start_row_;
   stop_row_ = scan.stop_row_;
   max_versions_ = scan.max_versions_;
@@ -47,7 +47,6 @@ Scan::Scan(const Scan &scan) {
   cache_blocks_ = scan.cache_blocks_;
   load_column_families_on_demand_ = scan.load_column_families_on_demand_;
   reversed_ = scan.reversed_;
-  small_ = scan.small_;
   allow_partial_results_ = scan.allow_partial_results_;
   consistency_ = scan.consistency_;
   tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -55,6 +54,7 @@ Scan::Scan(const Scan &scan) {
 }
 
 Scan &Scan::operator=(const Scan &scan) {
+  Query::operator=(scan);
   start_row_ = scan.start_row_;
   stop_row_ = scan.stop_row_;
   max_versions_ = scan.max_versions_;
@@ -63,7 +63,6 @@ Scan &Scan::operator=(const Scan &scan) {
   cache_blocks_ = scan.cache_blocks_;
   load_column_families_on_demand_ = scan.load_column_families_on_demand_;
   reversed_ = scan.reversed_;
-  small_ = scan.small_;
   allow_partial_results_ = scan.allow_partial_results_;
   consistency_ = scan.consistency_;
   tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -109,24 +108,14 @@ void Scan::SetReversed(bool reversed) { reversed_ = reversed; }
 
 bool Scan::IsReversed() const { return reversed_; }
 
-void Scan::SetStartRow(const std::string &start_row) {
-  CheckRow(start_row);
-  start_row_ = start_row;
-}
+void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; }
 
 const std::string &Scan::StartRow() const { return start_row_; }
 
-void Scan::SetStopRow(const std::string &stop_row) {
-  CheckRow(stop_row);
-  stop_row_ = stop_row;
-}
+void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; }
 
 const std::string &Scan::StopRow() const { return stop_row_; }
 
-void Scan::SetSmall(bool small) { small_ = small; }
-
-bool Scan::IsSmall() const { return small_; }
-
 void Scan::SetCaching(int caching) { caching_ = caching; }
 
 int Scan::Caching() const { return caching_; }
diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h
index fb302b7..1085c4b 100644
--- a/hbase-native-client/core/scan.h
+++ b/hbase-native-client/core/scan.h
@@ -119,16 +119,6 @@ class Scan : public Query {
   const std::string &StopRow() const;
 
   /**
-   * @brief Set whether this scan is a small scan.
-   */
-  void SetSmall(bool small);
-
-  /**
-   * @brief Returns if the scan is a small scan.
-   */
-  bool IsSmall() const;
-
-  /**
    * @brief Set the number of rows for caching that will be passed to scanners.
    * Higher caching values will enable faster scanners but will use more memory.
    * @param caching - the number of rows for caching.
@@ -258,12 +248,11 @@ class Scan : public Query {
   std::string start_row_ = "";
   std::string stop_row_ = "";
   uint32_t max_versions_ = 1;
-  int caching_ = -1;
+  int32_t caching_ = -1;
   int64_t max_result_size_ = -1;
   bool cache_blocks_ = true;
   bool load_column_families_on_demand_ = false;
   bool reversed_ = false;
-  bool small_ = false;
   bool allow_partial_results_ = false;
   hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG;
   std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>();
diff --git a/hbase-native-client/core/scanner-test.cc b/hbase-native-client/core/scanner-test.cc
new file mode 100644
index 0000000..1ecbd02
--- /dev/null
+++ b/hbase-native-client/core/scanner-test.cc
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "core/async-client-scanner.h"
+#include "core/async-table-result-scanner.h"
+#include "core/cell.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/filter.h"
+#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/put.h"
+#include "core/result.h"
+#include "core/row.h"
+#include "core/table.h"
+#include "if/Comparator.pb.h"
+#include "if/Filter.pb.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+#include "utils/time-util.h"
+
+using hbase::Cell;
+using hbase::ComparatorFactory;
+using hbase::Comparator;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::Put;
+using hbase::Result;
+using hbase::Scan;
+using hbase::Table;
+using hbase::TestUtil;
+using hbase::TimeUtil;
+using hbase::AsyncClientScanner;
+using hbase::AsyncTableResultScanner;
+using hbase::FilterFactory;
+using hbase::pb::CompareType;
+
+class ScannerTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+  static const uint32_t num_rows;
+
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+  }
+};
+std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr;
+const uint32_t ScannerTest::num_rows = 1000;
+
+std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); }
+
+std::string Row(uint32_t i, int width) {
+  std::ostringstream s;
+  s.fill('0');
+  s.width(width);
+  s << i;
+  return "row" + s.str();
+}
+
+std::string Row(uint32_t i) { return Row(i, 3); }
+
+std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) {
+  auto put = std::make_unique<Put>(row);
+
+  for (uint32_t i = 0; i < num_families; i++) {
+    put->AddColumn(Family(i), "q1", row);
+    put->AddColumn(Family(i), "q2", row + "-" + row);
+  }
+
+  return std::move(put);
+}
+
+void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) {
+  VLOG(1) << r.DebugString();
+  auto row = r.Row();
+  ASSERT_EQ(row, expected_row);
+  ASSERT_EQ(r.Cells().size(), num_families * 2);
+  for (uint32_t i = 0; i < num_families; i++) {
+    ASSERT_EQ(*r.Value(Family(i), "q1"), row);
+    ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row);
+  }
+}
+
+void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows,
+                 int32_t num_regions) {
+  LOG(INFO) << "Creating the table " << table_name
+            << " with num_regions:" << folly::to<std::string>(num_regions);
+  std::vector<std::string> families;
+  for (uint32_t i = 0; i < num_families; i++) {
+    families.push_back(Family(i));
+  }
+  if (num_regions <= 1) {
+    ScannerTest::test_util->CreateTable(table_name, families);
+  } else {
+    std::vector<std::string> keys;
+    for (int32_t i = 0; i < num_regions - 1; i++) {
+      keys.push_back(Row(i * (num_rows / (num_regions - 1))));
+      LOG(INFO) << "Split key:" << keys[keys.size() - 1];
+    }
+    ScannerTest::test_util->CreateTable(table_name, families, keys);
+  }
+}
+
+std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name,
+                                                       uint32_t num_families, uint32_t num_rows,
+                                                       int32_t num_regions) {
+  CreateTable(table_name, num_families, num_rows, num_regions);
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+  auto table = client->Table(tn);
+
+  LOG(INFO) << "Writing data to the table, num_rows:" << num_rows;
+  // Perform Puts
+  for (uint32_t i = 0; i < num_rows; i++) {
+    table->Put(*MakePut(Row(i), num_families));
+  }
+  return std::move(client);
+}
+
+void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows,
+              Table *table) {
+  LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow()
+            << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows;
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = start;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    CheckResult(*r, Row(i++), num_families);
+    r = scanner->Next();
+  }
+  ASSERT_EQ(i - start, num_rows);
+}
+
+void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) {
+  TestScan(scan, 1, start, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+  Scan scan{};
+  if (start >= 0) {
+    scan.SetStartRow(Row(start));
+  } else {
+    start = 0;  // neded for below logic
+  }
+  if (stop >= 0) {
+    scan.SetStopRow(Row(stop));
+  }
+
+  TestScan(scan, num_families, start, num_rows, table);
+}
+
+void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+  TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows,
+              Table *table) {
+  Scan scan{};
+
+  scan.SetStartRow(start);
+  scan.SetStopRow(stop);
+
+  LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop
+            << " expected_num_rows:" << num_rows;
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = 0;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    VLOG(1) << r->DebugString();
+    i++;
+    ASSERT_EQ(r->Map().size(), num_families);
+    r = scanner->Next();
+  }
+  ASSERT_EQ(i, num_rows);
+}
+
+void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) {
+  TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScanCombinations(Table *table, uint32_t num_families) {
+  // full table
+  TestScan(num_families, -1, -1, 1000, table);
+  TestScan(num_families, -1, 999, 999, table);
+  TestScan(num_families, 0, -1, 1000, table);
+  TestScan(num_families, 0, 999, 999, table);
+  TestScan(num_families, 10, 990, 980, table);
+  TestScan(num_families, 1, 998, 997, table);
+
+  TestScan(num_families, 123, 345, 222, table);
+  TestScan(num_families, 234, 456, 222, table);
+  TestScan(num_families, 345, 567, 222, table);
+  TestScan(num_families, 456, 678, 222, table);
+
+  // single results
+  TestScan(num_families, 111, 111, 1, table);  // split keys are like 111, 222, 333, etc
+  TestScan(num_families, 111, 112, 1, table);
+  TestScan(num_families, 332, 332, 1, table);
+  TestScan(num_families, 332, 333, 1, table);
+  TestScan(num_families, 333, 333, 1, table);
+  TestScan(num_families, 333, 334, 1, table);
+  TestScan(num_families, 42, 42, 1, table);
+  TestScan(num_families, 921, 921, 1, table);
+  TestScan(num_families, 0, 0, 1, table);
+  TestScan(num_families, 0, 1, 1, table);
+  TestScan(num_families, 999, 999, 1, table);
+
+  // few results
+  TestScan(num_families, 0, 0, 1, table);
+  TestScan(num_families, 0, 2, 2, table);
+  TestScan(num_families, 0, 5, 5, table);
+  TestScan(num_families, 10, 15, 5, table);
+  TestScan(num_families, 105, 115, 10, table);
+  TestScan(num_families, 111, 221, 110, table);
+  TestScan(num_families, 111, 222, 111, table);  // crossing region boundary 111-222
+  TestScan(num_families, 111, 223, 112, table);
+  TestScan(num_families, 111, 224, 113, table);
+  TestScan(num_families, 990, 999, 9, table);
+  TestScan(num_families, 900, 998, 98, table);
+
+  // empty results
+  TestScan(num_families, "a", "a", 0, table);
+  TestScan(num_families, "a", "r", 0, table);
+  TestScan(num_families, "", "r", 0, table);
+  TestScan(num_families, "s", "", 0, table);
+  TestScan(num_families, "s", "z", 0, table);
+  TestScan(num_families, Row(110) + "a", Row(111), 0, table);
+  TestScan(num_families, Row(111) + "a", Row(112), 0, table);
+  TestScan(num_families, Row(123) + "a", Row(124), 0, table);
+
+  // custom
+  TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+  TestScan(num_families, Row(0, 3), Row(0, 4), 1, table);
+  TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table);
+  TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+  TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table);
+  TestScan(num_families, "a", "z", 1000, table);
+}
+
+// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and
+// TestScannersFromClientSide*
+
+TEST_F(ScannerTest, SingleRegionScan) {
+  auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan"));
+
+  TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, MultiRegionScan) {
+  auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+  TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, ScanWithPauses) {
+  auto max_result_size =
+      ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152);
+  ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100);
+  auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+  VLOG(1) << "Starting scan for the test";
+  Scan scan{};
+  scan.SetCaching(100);
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = 0;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    CheckResult(*r, Row(i++), 1);
+    r = scanner->Next();
+    std::this_thread::sleep_for(TimeUtil::MillisToNanos(10));
+  }
+
+  auto s = static_cast<AsyncTableResultScanner *>(scanner.get());
+  ASSERT_GT(s->num_prefetch_stopped(), 0);
+
+  ASSERT_EQ(i, num_rows);
+  ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size);
+}
+
+TEST_F(ScannerTest, ScanWithFilters) {
+  auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters"));
+
+  Scan scan{};
+  scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL,
+                                            *ComparatorFactory::BinaryComparator(Row(800))));
+
+  TestScan(scan, 800, 200, table.get());
+}
+
+TEST_F(ScannerTest, ScanMultiFamily) {
+  auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family"));
+
+  TestScanCombinations(table.get(), 3);
+}
+
+TEST_F(ScannerTest, ScanNullQualifier) {
+  std::string table_name{"t_scan_null_qualifier"};
+  std::string row{"row"};
+  CreateTable(table_name, 1, 1, 1);
+
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+  auto table = client->Table(tn);
+
+  // Perform Puts
+  Put put{row};
+  put.AddColumn(Family(0), "q1", row);
+  put.AddColumn(Family(0), "", row);
+  table->Put(put);
+
+  Scan scan1{};
+  scan1.AddColumn(Family(0), "");
+  auto scanner1 = table->Scan(scan1);
+  auto r1 = scanner1->Next();
+  ASSERT_EQ(r1->Cells().size(), 1);
+  ASSERT_EQ(scanner1->Next(), nullptr);
+
+  Scan scan2{};
+  scan2.AddFamily(Family(0));
+  auto scanner2 = table->Scan(scan2);
+  auto r2 = scanner2->Next();
+  ASSERT_EQ(r2->Cells().size(), 2);
+  ASSERT_EQ(scanner2->Next(), nullptr);
+}
+
+TEST_F(ScannerTest, ScanNoResults) {
+  std::string table_name{"t_scan_no_results"};
+  auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3);
+  auto table = client->Table(folly::to<hbase::pb::TableName>(table_name));
+
+  Scan scan{};
+  scan.AddColumn(Family(0), "non_existing_qualifier");
+
+  TestScan(scan, 0, 0, table.get());
+}
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 3a7d62b..f79d848 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -30,6 +30,7 @@
 #include "core/client.h"
 #include "core/get.h"
 #include "core/put.h"
+#include "core/scan.h"
 #include "core/table.h"
 #include "serde/server-name.h"
 #include "serde/table-name.h"
@@ -38,6 +39,7 @@
 using hbase::Client;
 using hbase::Configuration;
 using hbase::Get;
+using hbase::Scan;
 using hbase::Put;
 using hbase::Table;
 using hbase::pb::TableName;
@@ -48,6 +50,10 @@ DEFINE_string(table, "test_table", "What table to do the reads or writes");
 DEFINE_string(row, "row_", "row prefix");
 DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
 DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
+DEFINE_bool(puts, true, "Whether to perform puts");
+DEFINE_bool(gets, true, "Whether to perform gets");
+DEFINE_bool(multigets, true, "Whether to perform multi-gets");
+DEFINE_bool(scans, true, "Whether to perform scans");
 DEFINE_bool(display_results, false, "Whether to display the Results from Gets");
 DEFINE_int32(threads, 6, "How many cpu threads");
 
@@ -86,41 +92,72 @@ int main(int argc, char *argv[]) {
   auto start_ns = TimeUtil::GetNowNanos();
 
   // Do the Put requests
-  for (uint64_t i = 0; i < num_puts; i++) {
-    table->Put(*MakePut(Row(FLAGS_row, i)));
-  }
+  if (FLAGS_puts) {
+    LOG(INFO) << "Sending put requests";
+    for (uint64_t i = 0; i < num_puts; i++) {
+      table->Put(*MakePut(Row(FLAGS_row, i)));
+    }
 
-  LOG(INFO) << "Successfully sent  " << num_puts << " Put requests in "
-            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    LOG(INFO) << "Successfully sent  " << num_puts << " Put requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
 
   // Do the Get requests
-  start_ns = TimeUtil::GetNowNanos();
-  for (uint64_t i = 0; i < num_puts; i++) {
-    auto result = table->Get(Get{Row(FLAGS_row, i)});
-    if (FLAGS_display_results) {
-      LOG(INFO) << result->DebugString();
+  if (FLAGS_gets) {
+    LOG(INFO) << "Sending get requests";
+    start_ns = TimeUtil::GetNowNanos();
+    for (uint64_t i = 0; i < num_puts; i++) {
+      auto result = table->Get(Get{Row(FLAGS_row, i)});
+      if (FLAGS_display_results) {
+        LOG(INFO) << result->DebugString();
+      }
     }
-  }
 
-  LOG(INFO) << "Successfully sent  " << num_puts << " Get requests in "
-            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    LOG(INFO) << "Successfully sent  " << num_puts << " Get requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
 
   // Do the Multi-Gets
-  std::vector<hbase::Get> gets;
-  for (uint64_t i = 0; i < num_puts; ++i) {
-    hbase::Get get(Row(FLAGS_row, i));
-    gets.push_back(get);
-  }
+  if (FLAGS_multigets) {
+    std::vector<hbase::Get> gets;
+    for (uint64_t i = 0; i < num_puts; ++i) {
+      hbase::Get get(Row(FLAGS_row, i));
+      gets.push_back(get);
+    }
+
+    LOG(INFO) << "Sending multi-get requests";
+    start_ns = TimeUtil::GetNowNanos();
+    auto results = table->Get(gets);
 
-  start_ns = TimeUtil::GetNowNanos();
-  auto results = table->Get(gets);
+    if (FLAGS_display_results) {
+      for (const auto &result : results) LOG(INFO) << result->DebugString();
+    }
 
-  if (FLAGS_display_results) {
-    for (const auto &result : results) LOG(INFO) << result->DebugString();
+    LOG(INFO) << "Successfully sent  " << gets.size() << " Multi-Get requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
 
-  LOG(INFO) << "Successfully sent  " << gets.size() << " Multi-Get requests in "
-            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  // Do the Scan
+  if (FLAGS_scans) {
+    LOG(INFO) << "Starting scanner";
+    start_ns = TimeUtil::GetNowNanos();
+    Scan scan{};
+    auto scanner = table->Scan(scan);
+
+    uint64_t i = 0;
+    auto r = scanner->Next();
+    while (r != nullptr) {
+      if (FLAGS_display_results) {
+        LOG(INFO) << r->DebugString();
+      }
+      r = scanner->Next();
+      i++;
+    }
+
+    LOG(INFO) << "Successfully iterated over  " << i << " Scan results in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    scanner->Close();
+  }
 
   table->Close();
   client->Close();
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index aa51989..9cdd8b0 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "core/async-connection.h"
+#include "core/async-table-result-scanner.h"
 #include "core/request-converter.h"
 #include "core/response-converter.h"
 #include "if/Client.pb.h"
@@ -52,6 +53,21 @@ std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
   return context.get(operation_timeout());
 }
 
+std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) {
+  auto max_cache_size = ResultSize2CacheSize(
+      scan.MaxResultSize() > 0 ? scan.MaxResultSize()
+                               : async_connection_->connection_conf()->scanner_max_result_size());
+  auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size);
+  async_table_->Scan(scan, scanner);
+  return scanner;
+}
+
+int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const {
+  // * 2 if possible
+  return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size
+                                                                      : max_results_size * 2;
+}
+
 void Table::Put(const hbase::Put &put) {
   auto future = async_table_->Put(put);
   future.get(operation_timeout());
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 81ddc8e..1f6d9b7 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -32,6 +32,7 @@
 #include "core/location-cache.h"
 #include "core/put.h"
 #include "core/raw-async-table.h"
+#include "core/result-scanner.h"
 #include "core/result.h"
 #include "serde/table-name.h"
 
@@ -74,6 +75,8 @@ class Table {
   std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment);
   // TODO: Batch Puts
 
+  std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
+
   /**
    * @brief - Close the client connection.
    */
@@ -92,5 +95,7 @@ class Table {
 
  private:
   std::chrono::milliseconds operation_timeout() const;
+
+  int64_t ResultSize2CacheSize(int64_t max_results_size) const;
 };
 } /* namespace hbase */
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
index eef4437..07ffeaf 100644
--- a/hbase-native-client/exceptions/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -17,8 +17,12 @@
 
 cxx_library(
     name="exceptions",
-    exported_headers=["exception.h",],
+    exported_headers=[
+        "exception.h",
+    ],
     srcs=[],
-    deps=["//third-party:folly",],
+    deps=[
+        "//third-party:folly",
+    ],
     compiler_flags=['-Weffc++'],
-    visibility=['//core/...','//connection//...'],)
\ No newline at end of file
+    visibility=['//core/...', '//connection//...'],)
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
index f25fbea..9cbd7ae 100644
--- a/hbase-native-client/exceptions/exception.h
+++ b/hbase-native-client/exceptions/exception.h
@@ -18,25 +18,22 @@
  */
 #pragma once
 
+#include <folly/ExceptionWrapper.h>
+#include <folly/io/IOBuf.h>
 #include <exception>
 #include <string>
 #include <vector>
-#include <folly/io/IOBuf.h>
-#include <folly/ExceptionWrapper.h>
 
 namespace hbase {
 
 class ThrowableWithExtraContext {
-public:
-  ThrowableWithExtraContext(folly::exception_wrapper cause,
-      const long& when) :
-      cause_(cause), when_(when), extras_("") {
-  }
+ public:
+  ThrowableWithExtraContext(folly::exception_wrapper cause, const long& when)
+      : cause_(cause), when_(when), extras_("") {}
 
-  ThrowableWithExtraContext(folly::exception_wrapper cause,
-      const long& when, const std::string& extras) :
-      cause_(cause), when_(when), extras_(extras) {
-  }
+  ThrowableWithExtraContext(folly::exception_wrapper cause, const long& when,
+                            const std::string& extras)
+      : cause_(cause), when_(when), extras_(extras) {}
 
   virtual std::string ToString() {
     // TODO:
@@ -44,55 +41,45 @@ public:
     return extras_ + ", " + cause_.what().toStdString();
   }
 
-  virtual folly::exception_wrapper cause() {
-    return cause_;
-  }
-private:
+  virtual folly::exception_wrapper cause() { return cause_; }
+
+ private:
   folly::exception_wrapper cause_;
   long when_;
   std::string extras_;
 };
 
-class IOException: public std::logic_error {
-public:
+class IOException : public std::logic_error {
+ public:
   IOException() : logic_error("") {}
 
-  IOException(
-        const std::string& what) :
-        logic_error(what) {}
-  IOException(
-      const std::string& what,
-	  folly::exception_wrapper cause) :
-      logic_error(what), cause_(cause) {}
+  IOException(const std::string& what) : logic_error(what) {}
+  IOException(const std::string& what, folly::exception_wrapper cause)
+      : logic_error(what), cause_(cause) {}
   virtual ~IOException() = default;
 
-  virtual folly::exception_wrapper cause() {
-    return cause_;
-  }
-private:
+  virtual folly::exception_wrapper cause() { return cause_; }
+
+ private:
   folly::exception_wrapper cause_;
 };
 
-class RetriesExhaustedException: public IOException {
-public:
-  RetriesExhaustedException(
-      const int& num_retries,
-      std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) :
-        IOException(
-            GetMessage(num_retries, exceptions),
-            exceptions->empty() ? folly::exception_wrapper{}
-              : (*exceptions)[exceptions->size() - 1].cause()){
-  }
+class RetriesExhaustedException : public IOException {
+ public:
+  RetriesExhaustedException(const int& num_retries,
+                            std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions)
+      : IOException(GetMessage(num_retries, exceptions),
+                    exceptions->empty() ? folly::exception_wrapper{}
+                                        : (*exceptions)[exceptions->size() - 1].cause()) {}
   virtual ~RetriesExhaustedException() = default;
 
-private:
-  std::string GetMessage(
-      const int& num_retries,
-      std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) {
+ private:
+  std::string GetMessage(const int& num_retries,
+                         std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) {
     std::string buffer("Failed after attempts=");
     buffer.append(std::to_string(num_retries + 1));
     buffer.append(", exceptions:\n");
-    for (auto it = exceptions->begin();  it != exceptions->end(); it++) {
+    for (auto it = exceptions->begin(); it != exceptions->end(); it++) {
       buffer.append(it->ToString());
       buffer.append("\n");
     }
@@ -100,74 +87,141 @@ private:
   }
 };
 
-class HBaseIOException : public IOException {
-};
+class HBaseIOException : public IOException {};
 
 class RemoteException : public IOException {
-public:
-
+ public:
   RemoteException() : port_(0), do_not_retry_(false) {}
 
-  RemoteException(const std::string& what) :
-      IOException(what), port_(0), do_not_retry_(false) {}
+  RemoteException(const std::string& what) : IOException(what), port_(0), do_not_retry_(false) {}
 
-  RemoteException(
-      const std::string& what,
-      folly::exception_wrapper cause) :
-      IOException(what, cause), port_(0), do_not_retry_(false) {}
+  RemoteException(const std::string& what, folly::exception_wrapper cause)
+      : IOException(what, cause), port_(0), do_not_retry_(false) {}
 
   virtual ~RemoteException() = default;
 
-  std::string exception_class_name() const {
-    return exception_class_name_;
-  }
+  std::string exception_class_name() const { return exception_class_name_; }
 
   RemoteException* set_exception_class_name(const std::string& value) {
     exception_class_name_ = value;
     return this;
   }
 
-  std::string stack_trace() const {
-    return stack_trace_;
-  }
+  std::string stack_trace() const { return stack_trace_; }
 
   RemoteException* set_stack_trace(const std::string& value) {
     stack_trace_ = value;
     return this;
   }
 
-  std::string hostname() const {
-    return hostname_;
-  }
+  std::string hostname() const { return hostname_; }
 
   RemoteException* set_hostname(const std::string& value) {
     hostname_ = value;
     return this;
   }
 
-  int port() const {
-    return port_;
-  }
+  int port() const { return port_; }
 
   RemoteException* set_port(int value) {
     port_ = value;
     return this;
   }
 
-  bool do_not_retry() const {
-    return do_not_retry_;
-  }
+  bool do_not_retry() const { return do_not_retry_; }
 
   RemoteException* set_do_not_retry(bool value) {
     do_not_retry_ = value;
     return this;
   }
 
-private:
+ private:
   std::string exception_class_name_;
   std::string stack_trace_;
   std::string hostname_;
   int port_;
   bool do_not_retry_;
 };
+
+/**
+ * List of known exceptions from Java side, and Java-specific exception logic
+ */
+class ExceptionUtil {
+ private:
+  // unknown scanner and sub-classes
+  static constexpr const char* kUnknownScannerException =
+      "org.apache.hadoop.hbase.UnknownScannerException";
+
+  // not serving region and sub-classes
+  static constexpr const char* kNotServingRegionException =
+      "org.apache.hadoop.hbase.NotServingRegionException";
+  static constexpr const char* kRegionInRecoveryException =
+      "org.apache.hadoop.hbase.exceptions.RegionInRecoveryException";
+  static constexpr const char* kRegionOpeningException =
+      "org.apache.hadoop.hbase.exceptions.RegionOpeningException";
+  static constexpr const char* kRegionMovedException =
+      "org.apache.hadoop.hbase.exceptions.RegionMovedException";
+
+  // Region server stopped and sub-classes
+  static constexpr const char* kRegionServerStoppedException =
+      "org.apache.hadoop.hbase.regionserver.RegionServerStoppedException";
+  static constexpr const char* kRegionServerAbortedException =
+      "org.apache.hadoop.hbase.regionserver.RegionServerAbortedException";
+
+  // other scanner related exceptions
+  static constexpr const char* kOutOfOrderScannerNextException =
+      "org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException";
+  static constexpr const char* kScannerResetException =
+      "org.apache.hadoop.hbase.exceptions.ScannerResetException";
+
+ public:
+  /**
+   * Returns whether or not the exception should be retried by looking at the
+   * remote exception.
+   */
+  static bool ShouldRetry(const folly::exception_wrapper& error) {
+    bool do_not_retry = false;
+    error.with_exception(
+        [&](const RemoteException& remote_ex) { do_not_retry = remote_ex.do_not_retry(); });
+    return !do_not_retry;
+  }
+
+  /**
+   * Returns whether the scanner is closed when the client received the
+   * remote exception.
+   * Ok, here is a nice detail about the java exceptions. In the java side, we
+   * just have a hierarchy of Exception classes that we use both client side and
+   * server side. On the client side, we rethrow the server side exception by
+   * un-wrapping the exception from a RemoteException or a ServiceException
+   * (see ConnectionUtils.translateException() in Java).
+   * Since this object-hierarchy info is not available in C++ side, we are doing a
+   * very fragile catch-all list of all exception types in Java that extend these
+   * three base classes: UnknownScannerException, NotServingRegionException,
+   * RegionServerStoppedException
+   */
+  static bool IsScannerClosed(const folly::exception_wrapper& exception) {
+    bool scanner_closed = false;
+    exception.with_exception([&](const RemoteException& remote_ex) {
+      auto java_class = remote_ex.exception_class_name();
+      if (java_class == kUnknownScannerException || java_class == kNotServingRegionException ||
+          java_class == kRegionInRecoveryException || java_class == kRegionOpeningException ||
+          java_class == kRegionMovedException || java_class == kRegionServerStoppedException ||
+          java_class == kRegionServerAbortedException) {
+        scanner_closed = true;
+      }
+    });
+    return scanner_closed;
+  }
+
+  static bool IsScannerOutOfOrder(const folly::exception_wrapper& exception) {
+    bool scanner_out_of_order = false;
+    exception.with_exception([&](const RemoteException& remote_ex) {
+      auto java_class = remote_ex.exception_class_name();
+      if (java_class == kOutOfOrderScannerNextException || java_class == kScannerResetException) {
+        scanner_out_of_order = true;
+      }
+    });
+    return scanner_out_of_order;
+  }
+};
 }  // namespace hbase
diff --git a/hbase-native-client/test-util/BUCK b/hbase-native-client/test-util/BUCK
index 4998614..7c92841 100644
--- a/hbase-native-client/test-util/BUCK
+++ b/hbase-native-client/test-util/BUCK
@@ -16,33 +16,37 @@
 # limitations under the License.
 import os
 
-cxx_library(name="test-util",
-  exported_headers=[
-    "test-util.h","mini-cluster.h"
-  ],
-  srcs=["test-util.cc","mini-cluster.cc"],
-  deps=[
-    "//third-party:folly",
-    "//core:core",
+cxx_library(
+    name="test-util",
+    exported_headers=["test-util.h", "mini-cluster.h"],
+    srcs=["test-util.cc", "mini-cluster.cc"],
+    deps=[
+        "//third-party:folly",
+        "//core:core",
     ],
-    preprocessor_flags= [
-    '-I' + os.environ['JAVA_HOME'] + '/include',
-    '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
-    '-I' + os.environ['JAVA_HOME'] + '/include/linux'],
-    exported_preprocessor_flags= [
-    '-I' + os.environ['JAVA_HOME'] + '/include',
-    '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
-    '-I' + os.environ['JAVA_HOME'] + '/include/linux'],
-    compiler_flags = [
-    '-I' + os.environ['JAVA_HOME'] + '/include',
-    '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
-    '-I' + os.environ['JAVA_HOME'] + '/include/linux', '-ggdb'],
-    linker_flags = ['-ljvm',
-    '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server', '-ggdb'],
-    exported_linker_flags = ['-ljvm',
-    '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server',
-    '-Wl,-rpath=' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server'],
-              visibility=[
-                'PUBLIC',
-            ],
-    )
+    preprocessor_flags=[
+        '-I' + os.environ['JAVA_HOME'] + '/include',
+        '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
+        '-I' + os.environ['JAVA_HOME'] + '/include/linux'
+    ],
+    exported_preprocessor_flags=[
+        '-I' + os.environ['JAVA_HOME'] + '/include',
+        '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
+        '-I' + os.environ['JAVA_HOME'] + '/include/linux'
+    ],
+    compiler_flags=[
+        '-I' + os.environ['JAVA_HOME'] + '/include',
+        '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
+        '-I' + os.environ['JAVA_HOME'] + '/include/linux', '-ggdb'
+    ],
+    linker_flags=[
+        '-ljvm', '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server',
+        '-ggdb'
+    ],
+    exported_linker_flags=[
+        '-ljvm', '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server',
+        '-Wl,-rpath=' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server'
+    ],
+    visibility=[
+        'PUBLIC',
+    ],)
diff --git a/hbase-native-client/test-util/mini-cluster.cc b/hbase-native-client/test-util/mini-cluster.cc
index 34da54c..688ea8e 100644
--- a/hbase-native-client/test-util/mini-cluster.cc
+++ b/hbase-native-client/test-util/mini-cluster.cc
@@ -119,6 +119,9 @@ void MiniCluster::Setup() {
         env_->GetMethodID(testing_util_class_, "createTable",
                           "(Lorg/apache/hadoop/hbase/TableName;Ljava/lang/String;)Lorg/"
                           "apache/hadoop/hbase/client/Table;");
+    create_table_families_mid_ = env_->GetMethodID(testing_util_class_, "createTable",
+                                                   "(Lorg/apache/hadoop/hbase/TableName;[[B)Lorg/"
+                                                   "apache/hadoop/hbase/client/Table;");
     create_table_with_split_mid_ = env_->GetMethodID(
         testing_util_class_, "createTable",
         "(Lorg/apache/hadoop/hbase/TableName;[[B[[B)Lorg/apache/hadoop/hbase/client/Table;");
@@ -198,20 +201,45 @@ jobject MiniCluster::CreateTable(const std::string &table, const std::string &fa
   return table_obj;
 }
 
+jobject MiniCluster::CreateTable(const std::string &table,
+                                 const std::vector<std::string> &families) {
+  jstring table_name_str = env_->NewStringUTF(table.c_str());
+  jobject table_name =
+      env_->CallStaticObjectMethod(table_name_class_, tbl_name_value_of_mid_, table_name_str);
+  jclass array_element_type = env_->FindClass("[B");
+  jobjectArray family_array = env_->NewObjectArray(families.size(), array_element_type, nullptr);
+  int i = 0;
+  for (auto family : families) {
+    env_->SetObjectArrayElement(family_array, i++, StrToByteChar(family));
+  }
+  jobject table_obj =
+      env_->CallObjectMethod(htu_, create_table_families_mid_, table_name, family_array);
+  return table_obj;
+}
+
 jobject MiniCluster::CreateTable(const std::string &table, const std::string &family,
                                  const std::vector<std::string> &keys) {
+  std::vector<std::string> families{};
+  families.push_back(std::string{family});
+  return CreateTable(table, families, keys);
+}
+
+jobject MiniCluster::CreateTable(const std::string &table, const std::vector<std::string> &families,
+                                 const std::vector<std::string> &keys) {
   jstring table_name_str = env_->NewStringUTF(table.c_str());
   jobject table_name =
       env_->CallStaticObjectMethod(table_name_class_, tbl_name_value_of_mid_, table_name_str);
   jclass array_element_type = env_->FindClass("[B");
 
-  jobjectArray family_array = env_->NewObjectArray(1, array_element_type, env_->NewByteArray(1));
-  env_->SetObjectArrayElement(family_array, 0, StrToByteChar(family));
+  int i = 0;
+  jobjectArray family_array = env_->NewObjectArray(families.size(), array_element_type, nullptr);
+  for (auto family : families) {
+    env_->SetObjectArrayElement(family_array, i++, StrToByteChar(family));
+  }
 
-  jobjectArray key_array =
-      env_->NewObjectArray(keys.size(), array_element_type, env_->NewByteArray(1));
+  jobjectArray key_array = env_->NewObjectArray(keys.size(), array_element_type, nullptr);
 
-  int i = 0;
+  i = 0;
   for (auto key : keys) {
     env_->SetObjectArrayElement(key_array, i++, StrToByteChar(key));
   }
diff --git a/hbase-native-client/test-util/mini-cluster.h b/hbase-native-client/test-util/mini-cluster.h
index 4119cb5..b8ac391 100644
--- a/hbase-native-client/test-util/mini-cluster.h
+++ b/hbase-native-client/test-util/mini-cluster.h
@@ -18,9 +18,9 @@
  */
 #pragma once
 
+#include <jni.h>
 #include <string>
 #include <vector>
-#include "jni.h"
 
 namespace hbase {
 
@@ -29,8 +29,11 @@ class MiniCluster {
   jobject StartCluster(int32_t num_region_servers);
   void StopCluster();
   jobject CreateTable(const std::string &table, const std::string &family);
+  jobject CreateTable(const std::string &table, const std::vector<std::string> &families);
   jobject CreateTable(const std::string &table, const std::string &family,
                       const std::vector<std::string> &keys);
+  jobject CreateTable(const std::string &table, const std::vector<std::string> &families,
+                      const std::vector<std::string> &keys);
   jobject StopRegionServer(int idx);
 
   // moves region to server
@@ -51,6 +54,7 @@ class MiniCluster {
   jmethodID set_conf_mid_;
   jmethodID tbl_name_value_of_mid_;
   jmethodID create_table_mid_;
+  jmethodID create_table_families_mid_;
   jmethodID create_table_with_split_mid_;
   jmethodID put_mid_;
   jmethodID put_ctor_;
diff --git a/hbase-native-client/test-util/test-util.cc b/hbase-native-client/test-util/test-util.cc
index c4e6ed2..26862d8 100644
--- a/hbase-native-client/test-util/test-util.cc
+++ b/hbase-native-client/test-util/test-util.cc
@@ -65,11 +65,20 @@ void TestUtil::CreateTable(const std::string &table, const std::string &family)
   mini_->CreateTable(table, family);
 }
 
+void TestUtil::CreateTable(const std::string &table, const std::vector<std::string> &families) {
+  mini_->CreateTable(table, families);
+}
+
 void TestUtil::CreateTable(const std::string &table, const std::string &family,
                            const std::vector<std::string> &keys) {
   mini_->CreateTable(table, family, keys);
 }
 
+void TestUtil::CreateTable(const std::string &table, const std::vector<std::string> &families,
+                           const std::vector<std::string> &keys) {
+  mini_->CreateTable(table, families, keys);
+}
+
 void TestUtil::StartStandAloneInstance() {
   auto p = temp_dir_.path().string();
   auto cmd = std::string{"bin/start-local-hbase.sh " + p};
diff --git a/hbase-native-client/test-util/test-util.h b/hbase-native-client/test-util/test-util.h
index 5729674..e26558b 100644
--- a/hbase-native-client/test-util/test-util.h
+++ b/hbase-native-client/test-util/test-util.h
@@ -59,8 +59,11 @@ class TestUtil {
 
   void StopMiniCluster();
   void CreateTable(const std::string &table, const std::string &family);
+  void CreateTable(const std::string &table, const std::vector<std::string> &families);
   void CreateTable(const std::string &table, const std::string &family,
                    const std::vector<std::string> &keys);
+  void CreateTable(const std::string &table, const std::vector<std::string> &families,
+                   const std::vector<std::string> &keys);
 
   void StartStandAloneInstance();
   void StopStandAloneInstance();
diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK
index ed8e114..788056b 100644
--- a/hbase-native-client/utils/BUCK
+++ b/hbase-native-client/utils/BUCK
@@ -20,6 +20,7 @@ cxx_library(
     exported_headers=[
         "bytes-util.h",
         "connection-util.h",
+        "optional.h",
         "sys-util.h",
         "time-util.h",
         "user-util.h",
diff --git a/hbase-native-client/utils/bytes-util-test.cc b/hbase-native-client/utils/bytes-util-test.cc
index ca64a21..16af021 100644
--- a/hbase-native-client/utils/bytes-util-test.cc
+++ b/hbase-native-client/utils/bytes-util-test.cc
@@ -47,6 +47,7 @@ TEST(TestBytesUtil, TestToStringBinary) {
   EXPECT_EQ("foo_\\x00\\xFF_bar",
             BytesUtil::ToStringBinary("foo_" + std::string{zero} + std::string{max} + "_bar"));
 }
+
 TEST(TestBytesUtil, TestToStringToInt64) {
   int64_t num = 761235;
   EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num)));
@@ -57,3 +58,13 @@ TEST(TestBytesUtil, TestToStringToInt64) {
   num = 0;
   EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num)));
 }
+
+TEST(TestBytesUtil, TestCreateClosestRowAfter) {
+  std::string empty{""};
+  EXPECT_EQ(BytesUtil::CreateClosestRowAfter(empty), std::string{'\0'});
+
+  std::string foo{"foo"};
+  EXPECT_EQ(BytesUtil::CreateClosestRowAfter(foo), std::string{"foo"} + '\0');
+
+  EXPECT_EQ("f\\x00", BytesUtil::ToStringBinary(BytesUtil::CreateClosestRowAfter("f")));
+}
diff --git a/hbase-native-client/utils/bytes-util.h b/hbase-native-client/utils/bytes-util.h
index 3566d62..6221bf0 100644
--- a/hbase-native-client/utils/bytes-util.h
+++ b/hbase-native-client/utils/bytes-util.h
@@ -42,7 +42,27 @@ class BytesUtil {
     */
   static std::string ToStringBinary(const std::string& b, size_t off, size_t len);
 
-  static std::string ToString(int64_t amt);
-  static long ToInt64(std::string str);
+  static std::string ToString(int64_t value);
+
+  static int64_t ToInt64(std::string str);
+
+  static bool IsEmptyStartRow(const std::string& row) { return row == ""; }
+
+  static bool IsEmptyStopRow(const std::string& row) { return row == ""; }
+
+  static int32_t CompareTo(const std::string& a, const std::string& b) {
+    if (a < b) {
+      return -1;
+    }
+    if (a == b) {
+      return 0;
+    }
+    return 1;
+  }
+
+  /**
+   * Create the closest row after the specified row
+   */
+  static std::string CreateClosestRowAfter(std::string row) { return row.append(1, '\0'); }
 };
 } /* namespace hbase */
diff --git a/hbase-native-client/core/query.h b/hbase-native-client/utils/optional.h
similarity index 70%
copy from hbase-native-client/core/query.h
copy to hbase-native-client/utils/optional.h
index b706303..a05eab5 100644
--- a/hbase-native-client/core/query.h
+++ b/hbase-native-client/utils/optional.h
@@ -19,23 +19,14 @@
 
 #pragma once
 
-#include "core/filter.h"
+#include <experimental/optional>
 
 namespace hbase {
 
 /**
- * Base class for read RPC calls (Get / Scan).
+ * An optional value that may or may not be present.
  */
-class Query {
- public:
-  virtual ~Query() {}
+template <class T>
+using optional = std::experimental::optional<T>;
 
-  void SetFilter(std::unique_ptr<Filter> filter) { filter_ = std::move(filter); }
-
-  const std::unique_ptr<Filter>& filter() const { return filter_; }
-
- protected:
-  std::unique_ptr<Filter> filter_ = nullptr;
-};
-
-}  // namespace hbase
+} /* namespace hbase */