You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/06/06 22:16:35 UTC

[2/3] hbase git commit: HBASE-17907 [C++] End to end Scans from Client/Table

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index f71fbba..998e2f1 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -111,9 +111,10 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) {
 folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
   auto caller =
       CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout())
-          ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller,
-                             std::shared_ptr<hbase::RegionLocation> loc,
-                             std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
+          ->action([=, &del](
+                       std::shared_ptr<hbase::HBaseRpcController> controller,
+                       std::shared_ptr<hbase::RegionLocation> loc,
+                       std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
             return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>(
                 rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest,
                 [](const Response& r) -> folly::Unit { return folly::unit; });
@@ -143,4 +144,24 @@ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::B
 
   return caller->Call().then([caller](auto r) { return r; });
 }
+
+void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) {
+  auto scanner = AsyncClientScanner::Create(
+      connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(),
+      connection_conf_->max_retries(), connection_conf_->scan_timeout(),
+      connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count());
+  scanner->Start();
+}
+
+std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) {
+  // always create a new scan object as we may reset the start row later.
+  auto new_scan = std::make_shared<hbase::Scan>(scan);
+  if (new_scan->Caching() <= 0) {
+    new_scan->SetCaching(default_scanner_caching_);
+  }
+  if (new_scan->MaxResultSize() <= 0) {
+    new_scan->SetMaxResultSize(default_scanner_max_result_size_);
+  }
+  return new_scan;
+}
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index c8e9f2f..8c40dae 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -24,7 +24,9 @@
 #include <memory>
 #include <string>
 #include <vector>
+
 #include "core/async-batch-rpc-retrying-caller.h"
+#include "core/async-client-scanner.h"
 #include "core/async-connection.h"
 #include "core/async-rpc-retrying-caller-factory.h"
 #include "core/async-rpc-retrying-caller.h"
@@ -34,6 +36,7 @@
 #include "core/increment.h"
 #include "core/put.h"
 #include "core/result.h"
+#include "core/scan.h"
 
 namespace hbase {
 
@@ -48,14 +51,22 @@ class RawAsyncTable {
       : connection_(connection),
         connection_conf_(connection->connection_conf()),
         table_name_(table_name),
-        rpc_client_(connection->rpc_client()) {}
+        rpc_client_(connection->rpc_client()) {
+    default_scanner_caching_ = connection_conf_->scanner_caching();
+    default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size();
+  }
   virtual ~RawAsyncTable() = default;
 
   folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get);
 
-	folly::Future<folly::Unit> Delete(const hbase::Delete& del);
-	folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+  folly::Future<folly::Unit> Delete(const hbase::Delete& del);
+
+  folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+
   folly::Future<folly::Unit> Put(const hbase::Put& put);
+
+  void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer);
+
   void Close() {}
 
   folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get(
@@ -69,6 +80,8 @@ class RawAsyncTable {
   std::shared_ptr<ConnectionConfiguration> connection_conf_;
   std::shared_ptr<pb::TableName> table_name_;
   std::shared_ptr<RpcClient> rpc_client_;
+  int32_t default_scanner_caching_;
+  int64_t default_scanner_max_result_size_;
 
   /* Methods */
   template <typename REQ, typename PREQ, typename PRESP, typename RESP>
@@ -81,5 +94,7 @@ class RawAsyncTable {
   template <typename RESP>
   std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(
       std::string row, std::chrono::nanoseconds rpc_timeout);
+
+  std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan);
 };
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-scan-result-consumer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-scan-result-consumer.h b/hbase-native-client/core/raw-scan-result-consumer.h
new file mode 100644
index 0000000..b7c3c48
--- /dev/null
+++ b/hbase-native-client/core/raw-scan-result-consumer.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Logging.h>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed };
+
+enum class ScanResumerState { kInitialized, kSuspended, kResumed };
+
+/**
+ * Used to resume a scan.
+ */
+class ScanResumer {
+ public:
+  virtual ~ScanResumer() = default;
+
+  /**
+   * Resume the scan. You are free to call it multiple time but only the first call will take
+   * effect.
+   */
+  virtual void Resume() = 0;
+};
+
+/**
+ * Used to suspend or stop a scan.
+ * <p>
+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
+ * IllegalStateException will be thrown if you call them at other places.
+ * <p>
+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are
+ * free to not call them both), and the methods are not reentrant. A IllegalStateException will be
+ * thrown if you have already called one of the methods.
+ */
+class ScanController {
+ public:
+  virtual ~ScanController() = default;
+
+  /**
+   * Suspend the scan.
+   * <p>
+   * This means we will stop fetching data in background, i.e., will not call onNext any more
+   * before you resume the scan.
+   * @return A resumer used to resume the scan later.
+   */
+  virtual std::shared_ptr<ScanResumer> Suspend() = 0;
+
+  /**
+   * Terminate the scan.
+   * <p>
+   * This is useful when you have got enough results and want to stop the scan in onNext method,
+   * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+   */
+  virtual void Terminate() = 0;
+};
+
+/**
+ * Receives {@link Result} for an asynchronous scan.
+ * <p>
+ * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
+ * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
+ * HBase in background while you process the returned data, you need to move the processing work to
+ * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
+ * consuming tasks in all methods below unless you know what you are doing.
+ */
+class RawScanResultConsumer {
+ public:
+  virtual ~RawScanResultConsumer() = default;
+
+  /**
+   * Indicate that we have receive some data.
+   * @param results the data fetched from HBase service.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within scope of onNext method. You can only call its method in
+   *          onNext, do NOT store it and call it later outside onNext.
+   */
+  virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results,
+                      std::shared_ptr<ScanController> controller) {}
+
+  /**
+   * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+   * onNext.
+   * <p>
+   * This method give you a chance to terminate a slow scan operation.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within the scope of onHeartbeat method. You can only call its
+   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+   */
+  virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {}
+
+  /**
+   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+   * <p>
+   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+   */
+  virtual void OnError(const folly::exception_wrapper &error) {}
+
+  /**
+   * Indicate that the scan operation is completed normally.
+   */
+  virtual void OnComplete() {}
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index 4087d94..d5d9d67 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -26,7 +26,7 @@
 
 namespace hbase {
 
-enum RegionLocateType { kBefore, kCurrent, kAfter };
+enum class RegionLocateType { kBefore, kCurrent, kAfter };
 
 /**
  * @brief class to hold where a region is located.

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter-test.cc b/hbase-native-client/core/request-converter-test.cc
index c01f16c..6c07a19 100644
--- a/hbase-native-client/core/request-converter-test.cc
+++ b/hbase-native-client/core/request-converter-test.cc
@@ -83,7 +83,6 @@ TEST(RequestConverter, ToScan) {
   scan.SetReversed(true);
   scan.SetStartRow(start_row);
   scan.SetStopRow(stop_row);
-  scan.SetSmall(true);
   scan.SetCaching(3);
   scan.SetConsistency(hbase::pb::Consistency::TIMELINE);
   scan.SetCacheBlocks(true);
@@ -105,7 +104,7 @@ TEST(RequestConverter, ToScan) {
   EXPECT_TRUE(msg->scan().reversed());
   EXPECT_EQ(msg->scan().start_row(), start_row);
   EXPECT_EQ(msg->scan().stop_row(), stop_row);
-  EXPECT_TRUE(msg->scan().small());
+  EXPECT_FALSE(msg->scan().small());
   EXPECT_EQ(msg->scan().caching(), 3);
   EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE);
   EXPECT_TRUE(msg->scan().cache_blocks());

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index a1e63fe..6eb2f04 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -53,19 +53,11 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
   return pb_req;
 }
 
-std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
-                                                         const std::string &region_name) {
-  auto pb_req = Request::scan();
-
-  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
-
-  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
-
-  auto pb_scan = pb_msg->mutable_scan();
+std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) {
+  auto pb_scan = std::make_unique<hbase::pb::Scan>();
   pb_scan->set_max_versions(scan.MaxVersions());
   pb_scan->set_cache_blocks(scan.CacheBlocks());
   pb_scan->set_reversed(scan.IsReversed());
-  pb_scan->set_small(scan.IsSmall());
   pb_scan->set_caching(scan.Caching());
   pb_scan->set_start_row(scan.StartRow());
   pb_scan->set_stop_row(scan.StopRow());
@@ -94,12 +86,78 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
     pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release());
   }
 
-  // TODO We will change this later.
+  return std::move(pb_scan);
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+                                                         const std::string &region_name) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_scan(ToScan(scan).release());
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+                                                         const std::string &region_name,
+                                                         int32_t num_rows, bool close_scanner) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_scan(ToScan(scan).release());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                         bool close_scanner) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+  pb_msg->set_scanner_id(scanner_id);
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                         bool close_scanner,
+                                                         int64_t next_call_seq_id, bool renew) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+  pb_msg->set_scanner_id(scanner_id);
+  pb_msg->set_next_call_seq(next_call_seq_id);
+
+  SetCommonScanRequestFields(pb_msg, renew);
+  return pb_req;
+}
+
+void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg,
+                                                  bool renew) {
+  // TODO We will change these later when we implement partial results and heartbeats, etc
   pb_msg->set_client_handles_partials(false);
   pb_msg->set_client_handles_heartbeats(false);
   pb_msg->set_track_scan_metrics(false);
-
-  return pb_req;
+  pb_msg->set_renew(renew);
+  // TODO: set scan limit
 }
 
 std::unique_ptr<Request> RequestConverter::ToMultiRequest(
@@ -123,7 +181,6 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
     }
   }
 
-  VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString();
   return pb_req;
 }
 
@@ -190,13 +247,13 @@ std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType t
 
 DeleteType RequestConverter::ToDeleteType(const CellType type) {
   switch (type) {
-    case DELETE:
+    case CellType::DELETE:
       return pb::MutationProto_DeleteType_DELETE_ONE_VERSION;
-    case DELETE_COLUMN:
+    case CellType::DELETE_COLUMN:
       return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS;
-    case DELETE_FAMILY:
+    case CellType::DELETE_FAMILY:
       return pb::MutationProto_DeleteType_DELETE_FAMILY;
-    case DELETE_FAMILY_VERSION:
+    case CellType::DELETE_FAMILY_VERSION:
       return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION;
     default:
       throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type));
@@ -216,12 +273,11 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put,
   pb_msg->set_allocated_mutation(
       ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release());
 
-  VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
   return pb_req;
 }
 
 std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del,
-                                                           const std::string &region_name) {
+                                                                 const std::string &region_name) {
   auto pb_req = Request::mutate();
   auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
   RequestConverter::SetRegion(region_name, pb_msg->mutable_region());

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index 6d57161..c807f45 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -66,9 +66,20 @@ class RequestConverter {
    */
   static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string &region_name);
 
+  static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string &region_name,
+                                                int32_t num_rows, bool close_scanner);
+
+  static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                bool close_scanner);
+
+  static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+                                                bool close_scanner, int64_t next_call_seq_id,
+                                                bool renew);
+
   static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion &region_requests);
 
-  static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,const std::string &region_name);
+  static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,
+                                                        const std::string &region_name);
 
   static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string &region_name);
 
@@ -91,8 +102,10 @@ class RequestConverter {
    */
   static void SetRegion(const std::string &region_name, pb::RegionSpecifier *region_specifier);
   static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get);
+  static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan);
   static DeleteType ToDeleteType(const CellType type);
   static bool IsDelete(const CellType type);
+  static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew);
 };
 
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 94b7875..d2d719b 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -54,7 +54,7 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& re
 }
 
 std::shared_ptr<Result> ResponseConverter::ToResult(
-    const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
+    const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) {
   std::vector<std::shared_ptr<Cell>> vcells;
   for (auto cell : result.cell()) {
     std::shared_ptr<Cell> pcell =
@@ -82,34 +82,38 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
 
 std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
   auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
-  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
-  int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
-                                                   : scan_resp->results_size();
+  return FromScanResponse(scan_resp, resp.cell_scanner());
+}
+
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
+    const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) {
+  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
+          << " cell_scanner:" << (cell_scanner == nullptr);
+  int num_results =
+      cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();
 
   std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)};
   for (int i = 0; i < num_results; i++) {
-    if (resp.cell_scanner() != nullptr) {
+    if (cell_scanner != nullptr) {
       // Cells are out in cellblocks.  Group them up again as Results.  How many to read at a
       // time will be found in getCellsLength -- length here is how many Cells in the i'th Result
       int num_cells = scan_resp->cells_per_result(i);
 
       std::vector<std::shared_ptr<Cell>> vcells;
-      while (resp.cell_scanner()->Advance()) {
-        vcells.push_back(resp.cell_scanner()->Current());
-      }
-      // TODO: check associated cell count?
-
-      if (vcells.size() != num_cells) {
-        std::string msg = "Results sent from server=" + std::to_string(num_results) +
-                          ". But only got " + std::to_string(i) +
-                          " results completely at client. Resetting the scanner to scan again.";
-        LOG(ERROR) << msg;
-        throw std::runtime_error(msg);
+      for (int j = 0; j < num_cells; j++) {
+        if (!cell_scanner->Advance()) {
+          std::string msg = "Results sent from server=" + std::to_string(num_results) +
+                            ". But only got " + std::to_string(i) +
+                            " results completely at client. Resetting the scanner to scan again.";
+          LOG(ERROR) << msg;
+          throw std::runtime_error(msg);
+        }
+        vcells.push_back(cell_scanner->Current());
       }
       // TODO: handle partial results per Result by checking partial_flag_per_result
       results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false);
     } else {
-      results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
+      results[i] = ToResult(scan_resp->results(i), cell_scanner);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/response-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 0fdde89..b518d1c 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -39,7 +39,7 @@ class ResponseConverter {
   ~ResponseConverter();
 
   static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result,
-                                          const std::unique_ptr<CellScanner>& cell_scanner);
+                                          const std::shared_ptr<CellScanner> cell_scanner);
 
   /**
    * @brief Returns a Result object created by PB Message in passed Response object.
@@ -51,6 +51,9 @@ class ResponseConverter {
 
   static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp);
 
+  static std::vector<std::shared_ptr<Result>> FromScanResponse(
+      const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner);
+
   static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
                                                           const Response& resp);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result-scanner.h b/hbase-native-client/core/result-scanner.h
new file mode 100644
index 0000000..9460521
--- /dev/null
+++ b/hbase-native-client/core/result-scanner.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <functional>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
+
+namespace hbase {
+
+/**
+ * Interface for client-side scanning. Use Table to obtain instances.
+ */
+class ResultScanner {
+  // TODO: should we implement forward iterators?
+
+ public:
+  virtual ~ResultScanner() {}
+
+  virtual void Close() = 0;
+
+  virtual std::shared_ptr<Result> Next() = 0;
+};
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result-test.cc b/hbase-native-client/core/result-test.cc
index 520f4b9..3299ffc 100644
--- a/hbase-native-client/core/result-test.cc
+++ b/hbase-native-client/core/result-test.cc
@@ -17,6 +17,7 @@
  *
  */
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 #include <limits>
 #include <memory>
@@ -71,7 +72,7 @@ void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) {
       }
       default: {
         cells.push_back(std::make_shared<Cell>(
-            row, family, column, std::numeric_limits<long>::max(), value, CellType::PUT));
+            row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT));
       }
     }
   }
@@ -255,7 +256,7 @@ TEST(Result, FilledResult) {
             break;
           }
           default: {
-            EXPECT_EQ(std::numeric_limits<long>::max(), version_map.first);
+            EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first);
             EXPECT_EQ(value, version_map.second);
           }
         }
@@ -297,3 +298,24 @@ TEST(Result, FilledResult) {
     EXPECT_EQ("value-9", qual_val_map.second);
   }
 }
+
+TEST(Result, ResultEstimatedSize) {
+  CellType cell_type = CellType::PUT;
+  int64_t timestamp = std::numeric_limits<int64_t>::max();
+  std::vector<std::shared_ptr<Cell> > cells;
+  Result empty(cells, true, false, false);
+
+  EXPECT_EQ(empty.EstimatedSize(), sizeof(Result));
+
+  cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+  Result result1(cells, true, false, false);
+  EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize());
+
+  cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+  Result result2(cells, true, false, false);
+  EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize());
+
+  LOG(INFO) << empty.EstimatedSize();
+  LOG(INFO) << result1.EstimatedSize();
+  LOG(INFO) << result2.EstimatedSize();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc
index 9d9ddb3..44b4c86 100644
--- a/hbase-native-client/core/result.cc
+++ b/hbase-native-client/core/result.cc
@@ -25,13 +25,7 @@ Result::~Result() {}
 
 Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale,
                bool partial)
-    : exists_(exists), stale_(stale), partial_(partial) {
-  for (const auto &cell : cells) {
-    cells_.push_back(cell);
-    // We create the map when cells are added. unlike java where map is created
-    // when result.getMap() is called
-    result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
-  }
+    : exists_(exists), stale_(stale), partial_(partial), cells_(cells) {
   row_ = (cells_.size() == 0 ? "" : cells_[0]->Row());
 }
 
@@ -43,10 +37,10 @@ Result::Result(const Result &result) {
   if (!result.cells_.empty()) {
     for (const auto &cell : result.cells_) {
       cells_.push_back(cell);
-      result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
     }
   }
 }
+
 const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; }
 
 std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family,
@@ -74,13 +68,12 @@ const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family,
   return nullptr;
 }
 
-std::shared_ptr<std::string> Result::Value(const std::string &family,
-                                           const std::string &qualifier) const {
+optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const {
   std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier));
   if (latest_cell.get()) {
-    return std::make_shared<std::string>(latest_cell->Value());
+    return optional<std::string>(latest_cell->Value());
   }
-  return nullptr;
+  return optional<std::string>();
 }
 
 bool Result::IsEmpty() const { return cells_.empty(); }
@@ -89,24 +82,33 @@ const std::string &Result::Row() const { return row_; }
 
 int Result::Size() const { return cells_.size(); }
 
-const ResultMap &Result::Map() const { return result_map_; }
+ResultMap Result::Map() const {
+  ResultMap result_map;
+  for (const auto &cell : cells_) {
+    result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
+  }
+  return result_map;
+}
 
-const std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
+std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
   std::map<std::string, std::string> family_map;
   if (!IsEmpty()) {
-    for (auto itr = result_map_.begin(); itr != result_map_.end(); ++itr) {
-      if (family == itr->first) {
-        for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
-          for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
-            // We break after inserting the first value. Result.java takes only
-            // the first value
-            family_map[qitr->first] = vitr->second;
-            break;
-          }
-        }
+    auto result_map = Map();
+    auto itr = result_map.find(family);
+    if (itr == result_map.end()) {
+      return family_map;
+    }
+
+    for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
+      for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
+        // We break after inserting the first value. Result.java takes only
+        // the first value
+        family_map[qitr->first] = vitr->second;
+        break;
       }
     }
   }
+
   return family_map;
 }
 
@@ -131,4 +133,14 @@ std::string Result::DebugString() const {
   return ret;
 }
 
+size_t Result::EstimatedSize() const {
+  size_t s = sizeof(Result);
+  s += row_.capacity();
+  for (const auto c : cells_) {
+    s += sizeof(std::shared_ptr<Cell>);
+    s + c->EstimatedSize();
+  }
+  return s;
+}
+
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h
index 627d161..f18071b 100644
--- a/hbase-native-client/core/result.h
+++ b/hbase-native-client/core/result.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "core/cell.h"
+#include "utils/optional.h"
 
 namespace hbase {
 
@@ -79,7 +80,7 @@ class Result {
    * @param family - column family
    * @param qualifier - column qualifier
    */
-  std::shared_ptr<std::string> Value(const std::string &family, const std::string &qualifier) const;
+  optional<std::string> Value(const std::string &family, const std::string &qualifier) const;
 
   /**
    * @brief Returns if the underlying Cell vector is empty or not
@@ -104,23 +105,32 @@ class Result {
    * All other map returning methods make use of this map internally
    * The Map is created when the Result instance is created
    */
-  const ResultMap &Map() const;
+  ResultMap Map() const;
 
   /**
    * @brief Map of qualifiers to values.
    * Returns a Map of the form: Map<qualifier,value>
    * @param family - column family to get
    */
-  const std::map<std::string, std::string> FamilyMap(const std::string &family) const;
+  std::map<std::string, std::string> FamilyMap(const std::string &family) const;
 
   std::string DebugString() const;
 
+  bool Exists() const { return exists_; }
+
+  bool Stale() const { return stale_; }
+
+  bool Partial() const { return partial_; }
+
+  /** Returns estimated size of the Result object including deep heap space usage
+   * of its Cells and data. Notice that this is a very rough estimate. */
+  size_t EstimatedSize() const;
+
  private:
   bool exists_ = false;
   bool stale_ = false;
   bool partial_ = false;
   std::string row_ = "";
   std::vector<std::shared_ptr<Cell> > cells_;
-  ResultMap result_map_;
 };
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-result-cache-test.cc b/hbase-native-client/core/scan-result-cache-test.cc
new file mode 100644
index 0000000..0bf83ce
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache-test.cc
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+
+using hbase::ScanResultCache;
+using hbase::Result;
+using hbase::Cell;
+using hbase::CellType;
+
+using ResultVector = std::vector<std::shared_ptr<Result>>;
+
+std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family,
+                                 const std::string &column) {
+  auto row = folly::to<std::string>(key);
+  return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row,
+                                CellType::PUT);
+}
+
+std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) {
+  return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial);
+}
+
+TEST(ScanResultCacheTest, NoPartial) {
+  ScanResultCache cache;
+  ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false));
+  ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true));
+  int32_t count = 10;
+  ResultVector results{};
+  for (int32_t i = 0; i < count; i++) {
+    results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false));
+  }
+  ASSERT_EQ(results, cache.AddAndGet(results, false));
+}
+
+TEST(ScanResultCacheTest, Combine1) {
+  ScanResultCache cache;
+  auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true);
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+  auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false);
+  ASSERT_EQ(1L, results.size());
+  ASSERT_EQ(prev_result, results[0]);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size());
+
+  results = cache.AddAndGet(ResultVector{}, false);
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(3, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+}
+
+TEST(ScanResultCacheTest, Combine2) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+  auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{next_result1}, false);
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(3, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+
+  results = cache.AddAndGet(ResultVector{next_to_next_result1}, false);
+  ASSERT_EQ(2, results.size());
+  ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(1, results[0]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row()));
+  ASSERT_EQ(1, results[1]->Cells().size());
+  ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, Combine3) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false);
+  auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false);
+
+  ASSERT_EQ(2, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row()));
+  ASSERT_EQ(1, results[1]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1")));
+
+  results = cache.AddAndGet(ResultVector{}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(1, results[0]->Cells().size());
+  ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+}
+
+TEST(ScanResultCacheTest, Combine4) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false);
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+  auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+
+  results = cache.AddAndGet(ResultVector{next_result2}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, SizeOf) {
+  std::string e{""};
+  std::string f{"f"};
+  std::string foo{"foo"};
+
+  LOG(INFO) << sizeof(e) << " " << e.capacity();
+  LOG(INFO) << sizeof(f) << " " << f.capacity();
+  LOG(INFO) << sizeof(foo) << " " << foo.capacity();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-result-cache.cc b/hbase-native-client/core/scan-result-cache.cc
new file mode 100644
index 0000000..62a51e0
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/scan-result-cache.h"
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+/**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet(
+    const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) {
+  // If no results were returned it indicates that either we have the all the partial results
+  // necessary to construct the complete result or the server had to send a heartbeat message
+  // to the client to keep the client-server connection alive
+  if (results.empty()) {
+    // If this response was an empty heartbeat message, then we have not exhausted the region
+    // and thus there may be more partials server side that still need to be added to the partial
+    // list before we form the complete Result
+    if (!partial_results_.empty() && !is_hearthbeat) {
+      return UpdateNumberOfCompleteResultsAndReturn(
+          std::vector<std::shared_ptr<Result>>{Combine()});
+    }
+    return std::vector<std::shared_ptr<Result>>{};
+  }
+  // In every RPC response there should be at most a single partial result. Furthermore, if
+  // there is a partial result, it is guaranteed to be in the last position of the array.
+  auto last = results[results.size() - 1];
+  if (last->Partial()) {
+    if (partial_results_.empty()) {
+      partial_results_.push_back(last);
+      std::vector<std::shared_ptr<Result>> new_results;
+      std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results));
+      return UpdateNumberOfCompleteResultsAndReturn(new_results);
+    }
+    // We have only one result and it is partial
+    if (results.size() == 1) {
+      // check if there is a row change
+      if (partial_results_.at(0)->Row() == last->Row()) {
+        partial_results_.push_back(last);
+        return std::vector<std::shared_ptr<Result>>{};
+      }
+      auto complete_result = Combine();
+      partial_results_.push_back(last);
+      return UpdateNumberOfCompleteResultsAndReturn(complete_result);
+    }
+    // We have some complete results
+    auto results_to_return = PrependCombined(results, results.size() - 1);
+    partial_results_.push_back(last);
+    return UpdateNumberOfCompleteResultsAndReturn(results_to_return);
+  }
+  if (!partial_results_.empty()) {
+    return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size()));
+  }
+  return UpdateNumberOfCompleteResultsAndReturn(results);
+}
+
+void ScanResultCache::Clear() { partial_results_.clear(); }
+
+std::shared_ptr<Result> ScanResultCache::CreateCompleteResult(
+    const std::vector<std::shared_ptr<Result>> &partial_results) {
+  if (partial_results.empty()) {
+    return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false);
+  }
+  std::vector<std::shared_ptr<Cell>> cells{};
+  bool stale = false;
+  std::string prev_row = "";
+  std::string current_row = "";
+  size_t i = 0;
+  for (const auto &r : partial_results) {
+    current_row = r->Row();
+    if (i != 0 && prev_row != current_row) {
+      throw new std::runtime_error(
+          "Cannot form complete result. Rows of partial results do not match.");
+    }
+    // Ensure that all Results except the last one are marked as partials. The last result
+    // may not be marked as a partial because Results are only marked as partials when
+    // the scan on the server side must be stopped due to reaching the maxResultSize.
+    // Visualizing it makes it easier to understand:
+    // maxResultSize: 2 cells
+    // (-x-) represents cell number x in a row
+    // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+    // How row1 will be returned by the server as partial Results:
+    // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+    // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+    // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+    if (i != partial_results.size() - 1 && !r->Partial()) {
+      throw new std::runtime_error("Cannot form complete result. Result is missing partial flag.");
+    }
+    prev_row = current_row;
+    stale = stale || r->Stale();
+    for (const auto &c : r->Cells()) {
+      cells.push_back(c);
+    }
+    i++;
+  }
+
+  return std::make_shared<Result>(cells, false, stale, false);
+}
+
+std::shared_ptr<Result> ScanResultCache::Combine() {
+  auto result = CreateCompleteResult(partial_results_);
+  partial_results_.clear();
+  return result;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined(
+    const std::vector<std::shared_ptr<Result>> &results, int length) {
+  if (length == 0) {
+    return std::vector<std::shared_ptr<Result>>{Combine()};
+  }
+  // the last part of a partial result may not be marked as partial so here we need to check if
+  // there is a row change.
+  size_t start;
+  if (partial_results_[0]->Row() == results[0]->Row()) {
+    partial_results_.push_back(results[0]);
+    start = 1;
+    length--;
+  } else {
+    start = 0;
+  }
+  std::vector<std::shared_ptr<Result>> prepend_results{};
+  prepend_results.push_back(Combine());
+  std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results));
+  return prepend_results;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+    const std::shared_ptr<Result> &result) {
+  return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result});
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+    const std::vector<std::shared_ptr<Result>> &results) {
+  num_complete_rows_ += results.size();
+  return results;
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-result-cache.h b/hbase-native-client/core/scan-result-cache.h
new file mode 100644
index 0000000..5d3d0ab
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Logging.h>
+#include <algorithm>
+#include <chrono>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+class ScanResultCache {
+  // In Java, there are 3 different implementations for this. We are not doing partial results,
+  // or scan batching in native code for now, so this version is simpler and
+  // only deals with giving back complete rows as Result. It is more or less implementation
+  // of CompleteScanResultCache.java
+
+ public:
+  /**
+   * Add the given results to cache and get valid results back.
+   * @param results the results of a scan next. Must not be null.
+   * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+   * @return valid results, never null.
+   */
+  std::vector<std::shared_ptr<Result>> AddAndGet(
+      const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat);
+
+  void Clear();
+
+  int64_t num_complete_rows() const { return num_complete_rows_; }
+
+ private:
+  /**
+     * Forms a single result from the partial results in the partialResults list. This method is
+     * useful for reconstructing partial results on the client side.
+     * @param partial_results list of partial results
+     * @return The complete result that is formed by combining all of the partial results together
+     */
+  static std::shared_ptr<Result> CreateCompleteResult(
+      const std::vector<std::shared_ptr<Result>> &partial_results);
+
+  std::shared_ptr<Result> Combine();
+
+  std::vector<std::shared_ptr<Result>> PrependCombined(
+      const std::vector<std::shared_ptr<Result>> &results, int length);
+
+  std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+      const std::shared_ptr<Result> &result);
+
+  std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+      const std::vector<std::shared_ptr<Result>> &results);
+
+ private:
+  std::vector<std::shared_ptr<Result>> partial_results_;
+  int64_t num_complete_rows_ = 0;
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc
index 73fb6df..0ee054c 100644
--- a/hbase-native-client/core/scan-test.cc
+++ b/hbase-native-client/core/scan-test.cc
@@ -17,13 +17,13 @@
  *
  */
 
-#include "core/scan.h"
-
+#include <gtest/gtest.h>
 #include <limits>
 
-#include <gtest/gtest.h>
+#include "core/scan.h"
 
-using namespace hbase;
+using hbase::Get;
+using hbase::Scan;
 
 void CheckFamilies(Scan &scan) {
   EXPECT_EQ(false, scan.HasFamilies());
@@ -74,7 +74,7 @@ void CheckFamilies(Scan &scan) {
   EXPECT_EQ(it, scan.FamilyMap().end());
 }
 
-void CheckFamiliesAfterCopy(Scan &scan) {
+void CheckFamiliesAfterCopy(const Scan &scan) {
   EXPECT_EQ(true, scan.HasFamilies());
   EXPECT_EQ(3, scan.FamilyMap().size());
   int i = 1;
@@ -116,11 +116,6 @@ void ScanMethods(Scan &scan) {
   scan.SetStopRow(stop_row);
   EXPECT_EQ(stop_row, scan.StopRow());
 
-  scan.SetSmall(true);
-  EXPECT_EQ(true, scan.IsSmall());
-  scan.SetSmall(false);
-  EXPECT_EQ(false, scan.IsSmall());
-
   scan.SetCaching(3);
   EXPECT_EQ(3, scan.Caching());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc
index 5f315ec..6dcc51b 100644
--- a/hbase-native-client/core/scan.cc
+++ b/hbase-native-client/core/scan.cc
@@ -38,7 +38,7 @@ Scan::Scan(const std::string &start_row, const std::string &stop_row)
   CheckRow(stop_row_);
 }
 
-Scan::Scan(const Scan &scan) {
+Scan::Scan(const Scan &scan) : Query(scan) {
   start_row_ = scan.start_row_;
   stop_row_ = scan.stop_row_;
   max_versions_ = scan.max_versions_;
@@ -47,7 +47,6 @@ Scan::Scan(const Scan &scan) {
   cache_blocks_ = scan.cache_blocks_;
   load_column_families_on_demand_ = scan.load_column_families_on_demand_;
   reversed_ = scan.reversed_;
-  small_ = scan.small_;
   allow_partial_results_ = scan.allow_partial_results_;
   consistency_ = scan.consistency_;
   tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -55,6 +54,7 @@ Scan::Scan(const Scan &scan) {
 }
 
 Scan &Scan::operator=(const Scan &scan) {
+  Query::operator=(scan);
   start_row_ = scan.start_row_;
   stop_row_ = scan.stop_row_;
   max_versions_ = scan.max_versions_;
@@ -63,7 +63,6 @@ Scan &Scan::operator=(const Scan &scan) {
   cache_blocks_ = scan.cache_blocks_;
   load_column_families_on_demand_ = scan.load_column_families_on_demand_;
   reversed_ = scan.reversed_;
-  small_ = scan.small_;
   allow_partial_results_ = scan.allow_partial_results_;
   consistency_ = scan.consistency_;
   tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -109,24 +108,14 @@ void Scan::SetReversed(bool reversed) { reversed_ = reversed; }
 
 bool Scan::IsReversed() const { return reversed_; }
 
-void Scan::SetStartRow(const std::string &start_row) {
-  CheckRow(start_row);
-  start_row_ = start_row;
-}
+void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; }
 
 const std::string &Scan::StartRow() const { return start_row_; }
 
-void Scan::SetStopRow(const std::string &stop_row) {
-  CheckRow(stop_row);
-  stop_row_ = stop_row;
-}
+void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; }
 
 const std::string &Scan::StopRow() const { return stop_row_; }
 
-void Scan::SetSmall(bool small) { small_ = small; }
-
-bool Scan::IsSmall() const { return small_; }
-
 void Scan::SetCaching(int caching) { caching_ = caching; }
 
 int Scan::Caching() const { return caching_; }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h
index fb302b7..1085c4b 100644
--- a/hbase-native-client/core/scan.h
+++ b/hbase-native-client/core/scan.h
@@ -119,16 +119,6 @@ class Scan : public Query {
   const std::string &StopRow() const;
 
   /**
-   * @brief Set whether this scan is a small scan.
-   */
-  void SetSmall(bool small);
-
-  /**
-   * @brief Returns if the scan is a small scan.
-   */
-  bool IsSmall() const;
-
-  /**
    * @brief Set the number of rows for caching that will be passed to scanners.
    * Higher caching values will enable faster scanners but will use more memory.
    * @param caching - the number of rows for caching.
@@ -258,12 +248,11 @@ class Scan : public Query {
   std::string start_row_ = "";
   std::string stop_row_ = "";
   uint32_t max_versions_ = 1;
-  int caching_ = -1;
+  int32_t caching_ = -1;
   int64_t max_result_size_ = -1;
   bool cache_blocks_ = true;
   bool load_column_families_on_demand_ = false;
   bool reversed_ = false;
-  bool small_ = false;
   bool allow_partial_results_ = false;
   hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG;
   std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scanner-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scanner-test.cc b/hbase-native-client/core/scanner-test.cc
new file mode 100644
index 0000000..1ecbd02
--- /dev/null
+++ b/hbase-native-client/core/scanner-test.cc
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "core/async-client-scanner.h"
+#include "core/async-table-result-scanner.h"
+#include "core/cell.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/filter.h"
+#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/put.h"
+#include "core/result.h"
+#include "core/row.h"
+#include "core/table.h"
+#include "if/Comparator.pb.h"
+#include "if/Filter.pb.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+#include "utils/time-util.h"
+
+using hbase::Cell;
+using hbase::ComparatorFactory;
+using hbase::Comparator;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::Put;
+using hbase::Result;
+using hbase::Scan;
+using hbase::Table;
+using hbase::TestUtil;
+using hbase::TimeUtil;
+using hbase::AsyncClientScanner;
+using hbase::AsyncTableResultScanner;
+using hbase::FilterFactory;
+using hbase::pb::CompareType;
+
+class ScannerTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+  static const uint32_t num_rows;
+
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+  }
+};
+std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr;
+const uint32_t ScannerTest::num_rows = 1000;
+
+std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); }
+
+std::string Row(uint32_t i, int width) {
+  std::ostringstream s;
+  s.fill('0');
+  s.width(width);
+  s << i;
+  return "row" + s.str();
+}
+
+std::string Row(uint32_t i) { return Row(i, 3); }
+
+std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) {
+  auto put = std::make_unique<Put>(row);
+
+  for (uint32_t i = 0; i < num_families; i++) {
+    put->AddColumn(Family(i), "q1", row);
+    put->AddColumn(Family(i), "q2", row + "-" + row);
+  }
+
+  return std::move(put);
+}
+
+void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) {
+  VLOG(1) << r.DebugString();
+  auto row = r.Row();
+  ASSERT_EQ(row, expected_row);
+  ASSERT_EQ(r.Cells().size(), num_families * 2);
+  for (uint32_t i = 0; i < num_families; i++) {
+    ASSERT_EQ(*r.Value(Family(i), "q1"), row);
+    ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row);
+  }
+}
+
+void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows,
+                 int32_t num_regions) {
+  LOG(INFO) << "Creating the table " << table_name
+            << " with num_regions:" << folly::to<std::string>(num_regions);
+  std::vector<std::string> families;
+  for (uint32_t i = 0; i < num_families; i++) {
+    families.push_back(Family(i));
+  }
+  if (num_regions <= 1) {
+    ScannerTest::test_util->CreateTable(table_name, families);
+  } else {
+    std::vector<std::string> keys;
+    for (int32_t i = 0; i < num_regions - 1; i++) {
+      keys.push_back(Row(i * (num_rows / (num_regions - 1))));
+      LOG(INFO) << "Split key:" << keys[keys.size() - 1];
+    }
+    ScannerTest::test_util->CreateTable(table_name, families, keys);
+  }
+}
+
+std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name,
+                                                       uint32_t num_families, uint32_t num_rows,
+                                                       int32_t num_regions) {
+  CreateTable(table_name, num_families, num_rows, num_regions);
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+  auto table = client->Table(tn);
+
+  LOG(INFO) << "Writing data to the table, num_rows:" << num_rows;
+  // Perform Puts
+  for (uint32_t i = 0; i < num_rows; i++) {
+    table->Put(*MakePut(Row(i), num_families));
+  }
+  return std::move(client);
+}
+
+void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows,
+              Table *table) {
+  LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow()
+            << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows;
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = start;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    CheckResult(*r, Row(i++), num_families);
+    r = scanner->Next();
+  }
+  ASSERT_EQ(i - start, num_rows);
+}
+
+void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) {
+  TestScan(scan, 1, start, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+  Scan scan{};
+  if (start >= 0) {
+    scan.SetStartRow(Row(start));
+  } else {
+    start = 0;  // neded for below logic
+  }
+  if (stop >= 0) {
+    scan.SetStopRow(Row(stop));
+  }
+
+  TestScan(scan, num_families, start, num_rows, table);
+}
+
+void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+  TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows,
+              Table *table) {
+  Scan scan{};
+
+  scan.SetStartRow(start);
+  scan.SetStopRow(stop);
+
+  LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop
+            << " expected_num_rows:" << num_rows;
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = 0;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    VLOG(1) << r->DebugString();
+    i++;
+    ASSERT_EQ(r->Map().size(), num_families);
+    r = scanner->Next();
+  }
+  ASSERT_EQ(i, num_rows);
+}
+
+void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) {
+  TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScanCombinations(Table *table, uint32_t num_families) {
+  // full table
+  TestScan(num_families, -1, -1, 1000, table);
+  TestScan(num_families, -1, 999, 999, table);
+  TestScan(num_families, 0, -1, 1000, table);
+  TestScan(num_families, 0, 999, 999, table);
+  TestScan(num_families, 10, 990, 980, table);
+  TestScan(num_families, 1, 998, 997, table);
+
+  TestScan(num_families, 123, 345, 222, table);
+  TestScan(num_families, 234, 456, 222, table);
+  TestScan(num_families, 345, 567, 222, table);
+  TestScan(num_families, 456, 678, 222, table);
+
+  // single results
+  TestScan(num_families, 111, 111, 1, table);  // split keys are like 111, 222, 333, etc
+  TestScan(num_families, 111, 112, 1, table);
+  TestScan(num_families, 332, 332, 1, table);
+  TestScan(num_families, 332, 333, 1, table);
+  TestScan(num_families, 333, 333, 1, table);
+  TestScan(num_families, 333, 334, 1, table);
+  TestScan(num_families, 42, 42, 1, table);
+  TestScan(num_families, 921, 921, 1, table);
+  TestScan(num_families, 0, 0, 1, table);
+  TestScan(num_families, 0, 1, 1, table);
+  TestScan(num_families, 999, 999, 1, table);
+
+  // few results
+  TestScan(num_families, 0, 0, 1, table);
+  TestScan(num_families, 0, 2, 2, table);
+  TestScan(num_families, 0, 5, 5, table);
+  TestScan(num_families, 10, 15, 5, table);
+  TestScan(num_families, 105, 115, 10, table);
+  TestScan(num_families, 111, 221, 110, table);
+  TestScan(num_families, 111, 222, 111, table);  // crossing region boundary 111-222
+  TestScan(num_families, 111, 223, 112, table);
+  TestScan(num_families, 111, 224, 113, table);
+  TestScan(num_families, 990, 999, 9, table);
+  TestScan(num_families, 900, 998, 98, table);
+
+  // empty results
+  TestScan(num_families, "a", "a", 0, table);
+  TestScan(num_families, "a", "r", 0, table);
+  TestScan(num_families, "", "r", 0, table);
+  TestScan(num_families, "s", "", 0, table);
+  TestScan(num_families, "s", "z", 0, table);
+  TestScan(num_families, Row(110) + "a", Row(111), 0, table);
+  TestScan(num_families, Row(111) + "a", Row(112), 0, table);
+  TestScan(num_families, Row(123) + "a", Row(124), 0, table);
+
+  // custom
+  TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+  TestScan(num_families, Row(0, 3), Row(0, 4), 1, table);
+  TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table);
+  TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+  TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table);
+  TestScan(num_families, "a", "z", 1000, table);
+}
+
+// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and
+// TestScannersFromClientSide*
+
+TEST_F(ScannerTest, SingleRegionScan) {
+  auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan"));
+
+  TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, MultiRegionScan) {
+  auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+  TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, ScanWithPauses) {
+  auto max_result_size =
+      ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152);
+  ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100);
+  auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+  VLOG(1) << "Starting scan for the test";
+  Scan scan{};
+  scan.SetCaching(100);
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = 0;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    CheckResult(*r, Row(i++), 1);
+    r = scanner->Next();
+    std::this_thread::sleep_for(TimeUtil::MillisToNanos(10));
+  }
+
+  auto s = static_cast<AsyncTableResultScanner *>(scanner.get());
+  ASSERT_GT(s->num_prefetch_stopped(), 0);
+
+  ASSERT_EQ(i, num_rows);
+  ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size);
+}
+
+TEST_F(ScannerTest, ScanWithFilters) {
+  auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters"));
+
+  Scan scan{};
+  scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL,
+                                            *ComparatorFactory::BinaryComparator(Row(800))));
+
+  TestScan(scan, 800, 200, table.get());
+}
+
+TEST_F(ScannerTest, ScanMultiFamily) {
+  auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family"));
+
+  TestScanCombinations(table.get(), 3);
+}
+
+TEST_F(ScannerTest, ScanNullQualifier) {
+  std::string table_name{"t_scan_null_qualifier"};
+  std::string row{"row"};
+  CreateTable(table_name, 1, 1, 1);
+
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+  auto table = client->Table(tn);
+
+  // Perform Puts
+  Put put{row};
+  put.AddColumn(Family(0), "q1", row);
+  put.AddColumn(Family(0), "", row);
+  table->Put(put);
+
+  Scan scan1{};
+  scan1.AddColumn(Family(0), "");
+  auto scanner1 = table->Scan(scan1);
+  auto r1 = scanner1->Next();
+  ASSERT_EQ(r1->Cells().size(), 1);
+  ASSERT_EQ(scanner1->Next(), nullptr);
+
+  Scan scan2{};
+  scan2.AddFamily(Family(0));
+  auto scanner2 = table->Scan(scan2);
+  auto r2 = scanner2->Next();
+  ASSERT_EQ(r2->Cells().size(), 2);
+  ASSERT_EQ(scanner2->Next(), nullptr);
+}
+
+TEST_F(ScannerTest, ScanNoResults) {
+  std::string table_name{"t_scan_no_results"};
+  auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3);
+  auto table = client->Table(folly::to<hbase::pb::TableName>(table_name));
+
+  Scan scan{};
+  scan.AddColumn(Family(0), "non_existing_qualifier");
+
+  TestScan(scan, 0, 0, table.get());
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 3a7d62b..f79d848 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -30,6 +30,7 @@
 #include "core/client.h"
 #include "core/get.h"
 #include "core/put.h"
+#include "core/scan.h"
 #include "core/table.h"
 #include "serde/server-name.h"
 #include "serde/table-name.h"
@@ -38,6 +39,7 @@
 using hbase::Client;
 using hbase::Configuration;
 using hbase::Get;
+using hbase::Scan;
 using hbase::Put;
 using hbase::Table;
 using hbase::pb::TableName;
@@ -48,6 +50,10 @@ DEFINE_string(table, "test_table", "What table to do the reads or writes");
 DEFINE_string(row, "row_", "row prefix");
 DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
 DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
+DEFINE_bool(puts, true, "Whether to perform puts");
+DEFINE_bool(gets, true, "Whether to perform gets");
+DEFINE_bool(multigets, true, "Whether to perform multi-gets");
+DEFINE_bool(scans, true, "Whether to perform scans");
 DEFINE_bool(display_results, false, "Whether to display the Results from Gets");
 DEFINE_int32(threads, 6, "How many cpu threads");
 
@@ -86,41 +92,72 @@ int main(int argc, char *argv[]) {
   auto start_ns = TimeUtil::GetNowNanos();
 
   // Do the Put requests
-  for (uint64_t i = 0; i < num_puts; i++) {
-    table->Put(*MakePut(Row(FLAGS_row, i)));
-  }
+  if (FLAGS_puts) {
+    LOG(INFO) << "Sending put requests";
+    for (uint64_t i = 0; i < num_puts; i++) {
+      table->Put(*MakePut(Row(FLAGS_row, i)));
+    }
 
-  LOG(INFO) << "Successfully sent  " << num_puts << " Put requests in "
-            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    LOG(INFO) << "Successfully sent  " << num_puts << " Put requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
 
   // Do the Get requests
-  start_ns = TimeUtil::GetNowNanos();
-  for (uint64_t i = 0; i < num_puts; i++) {
-    auto result = table->Get(Get{Row(FLAGS_row, i)});
-    if (FLAGS_display_results) {
-      LOG(INFO) << result->DebugString();
+  if (FLAGS_gets) {
+    LOG(INFO) << "Sending get requests";
+    start_ns = TimeUtil::GetNowNanos();
+    for (uint64_t i = 0; i < num_puts; i++) {
+      auto result = table->Get(Get{Row(FLAGS_row, i)});
+      if (FLAGS_display_results) {
+        LOG(INFO) << result->DebugString();
+      }
     }
-  }
 
-  LOG(INFO) << "Successfully sent  " << num_puts << " Get requests in "
-            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    LOG(INFO) << "Successfully sent  " << num_puts << " Get requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
 
   // Do the Multi-Gets
-  std::vector<hbase::Get> gets;
-  for (uint64_t i = 0; i < num_puts; ++i) {
-    hbase::Get get(Row(FLAGS_row, i));
-    gets.push_back(get);
-  }
+  if (FLAGS_multigets) {
+    std::vector<hbase::Get> gets;
+    for (uint64_t i = 0; i < num_puts; ++i) {
+      hbase::Get get(Row(FLAGS_row, i));
+      gets.push_back(get);
+    }
+
+    LOG(INFO) << "Sending multi-get requests";
+    start_ns = TimeUtil::GetNowNanos();
+    auto results = table->Get(gets);
 
-  start_ns = TimeUtil::GetNowNanos();
-  auto results = table->Get(gets);
+    if (FLAGS_display_results) {
+      for (const auto &result : results) LOG(INFO) << result->DebugString();
+    }
 
-  if (FLAGS_display_results) {
-    for (const auto &result : results) LOG(INFO) << result->DebugString();
+    LOG(INFO) << "Successfully sent  " << gets.size() << " Multi-Get requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
 
-  LOG(INFO) << "Successfully sent  " << gets.size() << " Multi-Get requests in "
-            << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  // Do the Scan
+  if (FLAGS_scans) {
+    LOG(INFO) << "Starting scanner";
+    start_ns = TimeUtil::GetNowNanos();
+    Scan scan{};
+    auto scanner = table->Scan(scan);
+
+    uint64_t i = 0;
+    auto r = scanner->Next();
+    while (r != nullptr) {
+      if (FLAGS_display_results) {
+        LOG(INFO) << r->DebugString();
+      }
+      r = scanner->Next();
+      i++;
+    }
+
+    LOG(INFO) << "Successfully iterated over  " << i << " Scan results in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    scanner->Close();
+  }
 
   table->Close();
   client->Close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index aa51989..9cdd8b0 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "core/async-connection.h"
+#include "core/async-table-result-scanner.h"
 #include "core/request-converter.h"
 #include "core/response-converter.h"
 #include "if/Client.pb.h"
@@ -52,6 +53,21 @@ std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
   return context.get(operation_timeout());
 }
 
+std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) {
+  auto max_cache_size = ResultSize2CacheSize(
+      scan.MaxResultSize() > 0 ? scan.MaxResultSize()
+                               : async_connection_->connection_conf()->scanner_max_result_size());
+  auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size);
+  async_table_->Scan(scan, scanner);
+  return scanner;
+}
+
+int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const {
+  // * 2 if possible
+  return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size
+                                                                      : max_results_size * 2;
+}
+
 void Table::Put(const hbase::Put &put) {
   auto future = async_table_->Put(put);
   future.get(operation_timeout());

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 81ddc8e..1f6d9b7 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -32,6 +32,7 @@
 #include "core/location-cache.h"
 #include "core/put.h"
 #include "core/raw-async-table.h"
+#include "core/result-scanner.h"
 #include "core/result.h"
 #include "serde/table-name.h"
 
@@ -74,6 +75,8 @@ class Table {
   std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment);
   // TODO: Batch Puts
 
+  std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
+
   /**
    * @brief - Close the client connection.
    */
@@ -92,5 +95,7 @@ class Table {
 
  private:
   std::chrono::milliseconds operation_timeout() const;
+
+  int64_t ResultSize2CacheSize(int64_t max_results_size) const;
 };
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/exceptions/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
index eef4437..07ffeaf 100644
--- a/hbase-native-client/exceptions/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -17,8 +17,12 @@
 
 cxx_library(
     name="exceptions",
-    exported_headers=["exception.h",],
+    exported_headers=[
+        "exception.h",
+    ],
     srcs=[],
-    deps=["//third-party:folly",],
+    deps=[
+        "//third-party:folly",
+    ],
     compiler_flags=['-Weffc++'],
-    visibility=['//core/...','//connection//...'],)
\ No newline at end of file
+    visibility=['//core/...', '//connection//...'],)