You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:50 UTC

[hbase] 62/133: HBASE-17585 [C++] Use KVCodec in the RPC request/response

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit a018834018f3356409342d6f92664def38b3a132
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Mon Feb 13 18:50:36 2017 -0800

    HBASE-17585 [C++] Use KVCodec in the RPC request/response
---
 .../connection/client-dispatcher.cc                | 10 ++--
 hbase-native-client/connection/client-dispatcher.h |  9 ++--
 hbase-native-client/connection/client-handler.cc   | 36 ++++++++++----
 hbase-native-client/connection/client-handler.h    |  7 +--
 .../connection/connection-factory.cc               |  5 +-
 .../connection/connection-factory.h                |  3 +-
 .../connection/connection-pool-test.cc             | 10 ++--
 hbase-native-client/connection/connection-pool.cc  | 11 +++--
 hbase-native-client/connection/connection-pool.h   |  3 +-
 hbase-native-client/connection/pipeline.cc         |  5 +-
 hbase-native-client/connection/pipeline.h          |  4 +-
 hbase-native-client/connection/response.h          | 11 ++++-
 hbase-native-client/connection/rpc-client.cc       | 32 +++++++------
 hbase-native-client/connection/rpc-client.h        | 21 ++++----
 hbase-native-client/connection/rpc-connection.h    |  2 +-
 hbase-native-client/connection/service.h           |  2 +-
 hbase-native-client/core/client-test.cc            |  6 +--
 hbase-native-client/core/client.cc                 |  9 +++-
 hbase-native-client/core/client.h                  |  2 +
 hbase-native-client/core/location-cache-test.cc    | 10 ++--
 hbase-native-client/core/location-cache.cc         | 20 +-------
 hbase-native-client/core/meta-utils.cc             | 34 +++++++++++++
 hbase-native-client/core/meta-utils.h              |  7 +++
 hbase-native-client/core/response_converter.cc     | 56 ++++++++++++++++++++--
 hbase-native-client/core/response_converter.h      |  9 +++-
 hbase-native-client/core/result.cc                 |  2 +-
 hbase-native-client/core/result.h                  |  2 +-
 hbase-native-client/core/simple-client.cc          |  7 ++-
 hbase-native-client/core/table.cc                  |  4 +-
 .../serde/client-deserializer-test.cc              |  8 ++--
 .../serde/client-serializer-test.cc                |  8 ++--
 hbase-native-client/serde/rpc.cc                   | 14 +++++-
 hbase-native-client/serde/rpc.h                    | 13 ++++-
 33 files changed, 270 insertions(+), 112 deletions(-)

diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 1ace99c..626fc76 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -26,8 +26,8 @@ using namespace wangle;
 
 ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {}
 
-void ClientDispatcher::read(Context *ctx, Response in) {
-  auto call_id = in.call_id();
+void ClientDispatcher::read(Context *ctx, std::unique_ptr<Response> in) {
+  auto call_id = in->call_id();
 
   auto search = requests_.find(call_id);
   CHECK(search != requests_.end());
@@ -37,13 +37,13 @@ void ClientDispatcher::read(Context *ctx, Response in) {
 
   // TODO(eclark): check if the response
   // is an exception. If it is then set that.
-  p.setValue(in);
+  p.setValue(std::move(in));
 }
 
-Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
+Future<std::unique_ptr<Response>> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
   auto call_id = current_call_id_++;
   arg->set_call_id(call_id);
-  requests_.insert(call_id, Promise<Response>{});
+  requests_.insert(call_id, Promise<std::unique_ptr<Response>>{});
   auto &p = requests_.find(call_id)->second;
   auto f = p.getFuture();
   p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h
index 0489717..857042c 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -36,21 +36,22 @@ namespace hbase {
  * future.
  */
 class ClientDispatcher
-    : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>, Response> {
+    : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>,
+                                          std::unique_ptr<Response>> {
  public:
   /** Create a new ClientDispatcher */
   ClientDispatcher();
   /** Read a response off the pipeline. */
-  void read(Context *ctx, Response in) override;
+  void read(Context *ctx, std::unique_ptr<Response> in) override;
   /** Take a request as a call and send it down the pipeline. */
-  folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
+  folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> arg) override;
   /** Close the dispatcher and the associated pipeline. */
   folly::Future<folly::Unit> close(Context *ctx) override;
   /** Close the dispatcher and the associated pipeline. */
   folly::Future<folly::Unit> close() override;
 
  private:
-  folly::AtomicHashMap<uint32_t, folly::Promise<Response>> requests_;
+  folly::AtomicHashMap<uint32_t, folly::Promise<std::unique_ptr<Response>>> requests_;
   // Start at some number way above what could
   // be there for un-initialized call id counters.
   //
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 5a6dce2..af84572 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -36,9 +36,9 @@ using hbase::pb::ResponseHeader;
 using hbase::pb::GetResponse;
 using google::protobuf::Message;
 
-ClientHandler::ClientHandler(std::string user_name)
+ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec)
     : user_name_(user_name),
-      serde_(),
+      serde_(codec),
       once_flag_(std::make_unique<std::once_flag>()),
       resp_msgs_(
           make_unique<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>(
@@ -47,12 +47,12 @@ ClientHandler::ClientHandler(std::string user_name)
 void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
   if (LIKELY(buf != nullptr)) {
     buf->coalesce();
-    Response received;
+    auto received = std::make_unique<Response>();
     ResponseHeader header;
 
     int used_bytes = serde_.ParseDelimited(buf.get(), &header);
-    LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
-              << " has_exception=" << header.has_exception();
+    VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
+            << " has_exception=" << header.has_exception();
 
     // Get the response protobuf from the map
     auto search = resp_msgs_->find(header.call_id());
@@ -67,17 +67,34 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
     // set the call_id.
     // This will be used to by the dispatcher to match up
     // the promise with the response.
-    received.set_call_id(header.call_id());
+    received->set_call_id(header.call_id());
 
     // If there was an exception then there's no
     // data left on the wire.
     if (header.has_exception() == false) {
       buf->trimStart(used_bytes);
+
+      int cell_block_length = 0;
       used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get());
+      if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) {
+        cell_block_length = header.cell_block_meta().length();
+      }
+
+      VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
+              << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length;
+
       // Make sure that bytes were parsed.
-      CHECK(used_bytes == buf->length());
-      received.set_resp_msg(resp_msg);
+      CHECK((used_bytes + cell_block_length) == buf->length());
+
+      if (cell_block_length > 0) {
+        auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
+        received->set_cell_scanner(std::move(cell_scanner));
+      }
+
+      received->set_resp_msg(resp_msg);
     }
+    // TODO: set exception in Response here
+
     ctx->fireRead(std::move(received));
   }
 }
@@ -94,6 +111,9 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
 
   // Now store the call id to response.
   resp_msgs_->insert(r->call_id(), r->resp_msg());
+
+  VLOG(1) << "Writing RPC Request with call_id:" << r->call_id();
+
   // Send the data down the pipeline.
   return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
 }
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index d860cc1..afb8e62 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -27,6 +27,7 @@
 #include <string>
 #include <utility>
 
+#include "serde/codec.h"
 #include "serde/rpc.h"
 
 // Forward decs.
@@ -51,14 +52,14 @@ namespace hbase {
  * on first request.
  */
 class ClientHandler
-    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, std::unique_ptr<Request>,
-                             std::unique_ptr<folly::IOBuf>> {
+    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Response>,
+                             std::unique_ptr<Request>, std::unique_ptr<folly::IOBuf>> {
  public:
   /**
    * Create the handler
    * @param user_name the user name of the user running this process.
    */
-  explicit ClientHandler(std::string user_name);
+  explicit ClientHandler(std::string user_name, std::shared_ptr<Codec> codec);
 
   /**
    * Get bytes from the wire.
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index ff83212..6aba351 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -26,8 +26,9 @@
 using namespace folly;
 using namespace hbase;
 
-ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool)
-    : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {}
+ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
+                                     std::shared_ptr<Codec> codec)
+    : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
 
 std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
   auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index da44c35..0d1e0d0 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -40,7 +40,8 @@ class ConnectionFactory {
    * Constructor.
    * There should only be one ConnectionFactory per client.
    */
-  explicit ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool);
+  ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
+                    std::shared_ptr<Codec> codec);
   /** Default Desctructor */
   virtual ~ConnectionFactory() = default;
 
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index 0930095..623ce3c 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -36,7 +36,7 @@ using hbase::ConnectionId;
 
 class MockConnectionFactory : public ConnectionFactory {
  public:
-  MockConnectionFactory() : ConnectionFactory(nullptr) {}
+  MockConnectionFactory() : ConnectionFactory(nullptr, nullptr) {}
   MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
   MOCK_METHOD3(Connect, std::shared_ptr<HBaseService>(
                             std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
@@ -47,17 +47,17 @@ class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};
 
 class MockServiceBase : public HBaseService {
  public:
-  folly::Future<Response> operator()(std::unique_ptr<Request> req) override {
+  folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> req) override {
     return do_operation(req.get());
   }
-  virtual folly::Future<Response> do_operation(Request *req) {
-    return folly::makeFuture<Response>(Response{});
+  virtual folly::Future<std::unique_ptr<Response>> do_operation(Request *req) {
+    return folly::makeFuture<std::unique_ptr<Response>>(std::make_unique<Response>());
   }
 };
 
 class MockService : public MockServiceBase {
  public:
-  MOCK_METHOD1(do_operation, folly::Future<Response>(Request *));
+  MOCK_METHOD1(do_operation, folly::Future<std::unique_ptr<Response>>(Request *));
 };
 
 TEST(TestConnectionPool, TestOnlyCreateOnce) {
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 15dd64e..6635a6d 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -33,8 +33,9 @@ using hbase::HBaseService;
 using folly::SharedMutexWritePriority;
 using folly::SocketAddress;
 
-ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
-    : cf_(std::make_shared<ConnectionFactory>(io_executor)),
+ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                               std::shared_ptr<Codec> codec)
+    : cf_(std::make_shared<ConnectionFactory>(io_executor, codec)),
       clients_(),
       connections_(),
       map_mutex_() {}
@@ -88,12 +89,12 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
     auto clientBootstrap = cf_->MakeBootstrap();
     auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port());
 
-    auto conneciton = std::make_shared<RpcConnection>(remote_id, dispatcher);
+    auto connection = std::make_shared<RpcConnection>(remote_id, dispatcher);
 
-    connections_.insert(std::make_pair(remote_id, conneciton));
+    connections_.insert(std::make_pair(remote_id, connection));
     clients_.insert(std::make_pair(remote_id, clientBootstrap));
 
-    return conneciton;
+    return connection;
   }
 }
 
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 1f2a182..23e5e9a 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -46,7 +46,8 @@ namespace hbase {
 class ConnectionPool {
  public:
   /** Create connection pool wit default connection factory */
-  explicit ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
+  ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                 std::shared_ptr<Codec> codec);
 
   /**
    * Desctructor.
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 14ad73c..00dc05c 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -30,7 +30,8 @@ using namespace folly;
 using namespace hbase;
 using namespace wangle;
 
-RpcPipelineFactory::RpcPipelineFactory() : user_util_() {}
+RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec)
+    : user_util_(), codec_(codec) {}
 
 SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
     std::shared_ptr<AsyncTransportWrapper> sock) {
@@ -38,7 +39,7 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
   pipeline->addBack(AsyncSocketHandler{sock});
   pipeline->addBack(EventBaseHandler{});
   pipeline->addBack(LengthFieldBasedFrameDecoder{});
-  pipeline->addBack(ClientHandler{user_util_.user_name()});
+  pipeline->addBack(ClientHandler{user_util_.user_name(), codec_});
   pipeline->finalize();
   return pipeline;
 }
diff --git a/hbase-native-client/connection/pipeline.h b/hbase-native-client/connection/pipeline.h
index 343219d..ea40cfd 100644
--- a/hbase-native-client/connection/pipeline.h
+++ b/hbase-native-client/connection/pipeline.h
@@ -25,6 +25,7 @@
 
 #include "connection/request.h"
 #include "connection/response.h"
+#include "serde/codec.h"
 #include "utils/user-util.h"
 
 namespace hbase {
@@ -40,7 +41,7 @@ class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
   /**
    * Constructor. This will create user util.
    */
-  RpcPipelineFactory();
+  explicit RpcPipelineFactory(std::shared_ptr<Codec> codec);
 
   /**
    * Create a new pipeline.
@@ -55,5 +56,6 @@ class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
 
  private:
   UserUtil user_util_;
+  std::shared_ptr<Codec> codec_;
 };
 }  // namespace hbase
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
index 560387c..1d60fed 100644
--- a/hbase-native-client/connection/response.h
+++ b/hbase-native-client/connection/response.h
@@ -22,6 +22,8 @@
 #include <memory>
 #include <utility>
 
+#include "serde/cell-scanner.h"
+
 // Forward
 namespace google {
 namespace protobuf {
@@ -42,7 +44,7 @@ class Response {
    * Constructor.
    * Initinalizes the call id to 0. 0 should never be a valid call id.
    */
-  Response() : call_id_(0), resp_msg_(nullptr) {}
+  Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {}
 
   /** Get the call_id */
   uint32_t call_id() { return call_id_; }
@@ -62,8 +64,15 @@ class Response {
     resp_msg_ = std::move(response);
   }
 
+  void set_cell_scanner(std::unique_ptr<CellScanner> cell_scanner) {
+    cell_scanner_ = std::move(cell_scanner);
+  }
+
+  const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; }
+
  private:
   uint32_t call_id_;
   std::shared_ptr<google::protobuf::Message> resp_msg_;
+  std::unique_ptr<CellScanner> cell_scanner_;
 };
 }  // namespace hbase
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 7621193..cfbda3a 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -39,38 +39,40 @@ class RpcChannelImplementation : public AbstractRpcChannel {
 };
 }  // namespace hbase
 
-RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                     std::shared_ptr<Codec> codec)
     : io_executor_(io_executor) {
-  cp_ = std::make_shared<ConnectionPool>(io_executor_);
+  cp_ = std::make_shared<ConnectionPool>(io_executor_, codec);
 }
 
 void RpcClient::Close() { io_executor_->stop(); }
 
-std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
+std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
                                               std::unique_ptr<Request> req,
                                               std::shared_ptr<User> ticket) {
-  return std::make_shared<Response>(AsyncCall(host, port, std::move(req), ticket).get());
+  return AsyncCall(host, port, std::move(req), ticket).get();
 }
 
-std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
+std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
                                               std::unique_ptr<Request> req,
                                               std::shared_ptr<User> ticket,
                                               const std::string& service_name) {
-  return std::make_shared<Response>(
-      AsyncCall(host, port, std::move(req), ticket, service_name).get());
+  return AsyncCall(host, port, std::move(req), ticket, service_name).get();
 }
 
-folly::Future<Response> RpcClient::AsyncCall(const std::string& host, uint16_t port,
-                                             std::unique_ptr<Request> req,
-                                             std::shared_ptr<User> ticket) {
+folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
+                                                              uint16_t port,
+                                                              std::unique_ptr<Request> req,
+                                                              std::shared_ptr<User> ticket) {
   auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
   return GetConnection(remote_id)->SendRequest(std::move(req));
 }
 
-folly::Future<Response> RpcClient::AsyncCall(const std::string& host, uint16_t port,
-                                             std::unique_ptr<Request> req,
-                                             std::shared_ptr<User> ticket,
-                                             const std::string& service_name) {
+folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
+                                                              uint16_t port,
+                                                              std::unique_ptr<Request> req,
+                                                              std::shared_ptr<User> ticket,
+                                                              const std::string& service_name) {
   auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name);
   return GetConnection(remote_id)->SendRequest(std::move(req));
 }
@@ -99,5 +101,5 @@ void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* contro
   std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name());
 
   AsyncCall(host, port, std::move(req), ticket, method->service()->name())
-      .then([done, this](Response resp) { done->Run(); });
+      .then([done, this](std::unique_ptr<Response> resp) { done->Run(); });
 }
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index aeb9b56..f4645a0 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -51,27 +51,28 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
   friend class RpcChannelImplementation;
 
  public:
-  RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
+  RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+            std::shared_ptr<Codec> codec);
 
   virtual ~RpcClient() { Close(); }
 
-  virtual std::shared_ptr<Response> SyncCall(const std::string &host, uint16_t port,
+  virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port,
                                              std::unique_ptr<Request> req,
                                              std::shared_ptr<User> ticket);
 
-  virtual std::shared_ptr<Response> SyncCall(const std::string &host, uint16_t port,
+  virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port,
                                              std::unique_ptr<Request> req,
                                              std::shared_ptr<User> ticket,
                                              const std::string &service_name);
 
-  virtual folly::Future<Response> AsyncCall(const std::string &host, uint16_t port,
-                                            std::unique_ptr<Request> req,
-                                            std::shared_ptr<User> ticket);
+  virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port,
+                                                             std::unique_ptr<Request> req,
+                                                             std::shared_ptr<User> ticket);
 
-  virtual folly::Future<Response> AsyncCall(const std::string &host, uint16_t port,
-                                            std::unique_ptr<Request> req,
-                                            std::shared_ptr<User> ticket,
-                                            const std::string &service_name);
+  virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port,
+                                                             std::unique_ptr<Request> req,
+                                                             std::shared_ptr<User> ticket,
+                                                             const std::string &service_name);
 
   virtual void Close();
 
diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h
index e2500b2..c37b1e0 100644
--- a/hbase-native-client/connection/rpc-connection.h
+++ b/hbase-native-client/connection/rpc-connection.h
@@ -41,7 +41,7 @@ class RpcConnection {
 
   virtual std::shared_ptr<HBaseService> get_service() const { return hbase_service_; }
 
-  virtual folly::Future<Response> SendRequest(std::unique_ptr<Request> req) {
+  virtual folly::Future<std::unique_ptr<Response>> SendRequest(std::unique_ptr<Request> req) {
     return (*hbase_service_)(std::move(req));
   }
 
diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h
index 0d2e258..64d4f07 100644
--- a/hbase-native-client/connection/service.h
+++ b/hbase-native-client/connection/service.h
@@ -26,5 +26,5 @@
 #include "connection/response.h"
 
 namespace hbase {
-using HBaseService = wangle::Service<std::unique_ptr<Request>, Response>;
+using HBaseService = wangle::Service<std::unique_ptr<Request>, std::unique_ptr<Response>>;
 }  // namespace hbase
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 28eec6f..0a45fff 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -116,9 +116,9 @@ TEST(Client, Get) {
 
   // Using TestUtil to populate test data
   hbase::TestUtil *test_util = new hbase::TestUtil();
-  test_util->RunShellCmd("create 't', 'd'");
-  test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'");
-  test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'");
+  test_util->RunShellCmd(
+      "create 't', 'd'; put 't', 'test2', 'd:2', 'value2'; put 't', 'test2', 'd:extra', 'value for "
+      "extra'");
 
   // Create TableName and Row to be fetched from HBase
   auto tn = folly::to<hbase::pb::TableName>("t");
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index c1efd8b..685524f 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -46,7 +46,14 @@ void Client::init(const hbase::Configuration &conf) {
       std::make_shared<wangle::CPUThreadPoolExecutor>(4);  // TODO: read num threads from conf
   io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
 
-  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_);
+  std::shared_ptr<Codec> codec = nullptr;
+  if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
+      std::string(KeyValueCodec::kJavaClassName)) {
+    codec = std::make_shared<hbase::KeyValueCodec>();
+  } else {
+    LOG(WARNING) << "Not using RPC Cell Codec";
+  }
+  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec);
   location_cache_ =
       std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
 }
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 730981d..0e436ba 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -30,6 +30,7 @@
 #include "connection/rpc-client.h"
 #include "core/configuration.h"
 #include "core/hbase_configuration_loader.h"
+#include "core/keyvalue-codec.h"
 #include "core/location-cache.h"
 #include "core/table.h"
 #include "if/Cell.pb.h"
@@ -70,6 +71,7 @@ class Client {
   void init(const hbase::Configuration &conf);
   const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
   const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
+  const std::string kRpcCodec = "hbase.client.rpc.codec";
   std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
   std::shared_ptr<hbase::LocationCache> location_cache_;
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 1ad6c65..9a778c6 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -23,6 +23,7 @@
 
 #include <chrono>
 
+#include "core/keyvalue-codec.h"
 #include "if/HBase.pb.h"
 #include "serde/table-name.h"
 #include "test-util/test-util.h"
@@ -33,7 +34,8 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) {
   TestUtil test_util{};
   auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
   auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
-  auto cp = std::make_shared<ConnectionPool>(io);
+  auto codec = std::make_shared<KeyValueCodec>();
+  auto cp = std::make_shared<ConnectionPool>(io, codec);
   LocationCache cache{test_util.conf(), cpu, cp};
   auto f = cache.LocateMeta();
   auto result = f.get();
@@ -49,7 +51,8 @@ TEST(LocationCacheTest, TestGetRegionLocation) {
   TestUtil test_util{};
   auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
   auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
-  auto cp = std::make_shared<ConnectionPool>(io);
+  auto codec = std::make_shared<KeyValueCodec>();
+  auto cp = std::make_shared<ConnectionPool>(io, codec);
   LocationCache cache{test_util.conf(), cpu, cp};
 
   // If there is no table this should throw an exception
@@ -68,7 +71,8 @@ TEST(LocationCacheTest, TestCaching) {
   TestUtil test_util{};
   auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
   auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
-  auto cp = std::make_shared<ConnectionPool>(io);
+  auto codec = std::make_shared<KeyValueCodec>();
+  auto cp = std::make_shared<ConnectionPool>(io, codec);
   LocationCache cache{test_util.conf(), cpu, cp};
 
   auto tn_1 = folly::to<hbase::pb::TableName>("t1");
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index dab5deb..da9f64a 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -128,10 +128,10 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const Tabl
       .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
         return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row)));
       })
-      .then([this](Response resp) {
+      .then([this](std::unique_ptr<Response> resp) {
         // take the protobuf response and make it into
         // a region location.
-        return this->CreateLocation(std::move(resp));
+        return meta_util_.CreateLocation(std::move(*resp));
       })
       .then([tn, this](std::shared_ptr<RegionLocation> rl) {
         // Make sure that the correct location was found.
@@ -166,22 +166,6 @@ Future<shared_ptr<RegionLocation>> LocationCache::LocateRegion(const hbase::pb::
   }
 }
 
-std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &resp) {
-  auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg());
-  auto &results = resp_msg->results().Get(0);
-  auto &cells = results.cell();
-
-  // TODO(eclark): There should probably be some better error
-  // handling around this.
-  auto cell_zero = cells.Get(0).value();
-  auto cell_one = cells.Get(1).value();
-  auto row = cells.Get(0).row();
-
-  auto region_info = folly::to<RegionInfo>(cell_zero);
-  auto server_name = folly::to<ServerName>(cell_one);
-  return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr);
-}
-
 // must hold shared lock on locations_lock_
 shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName &tn,
                                                             const std::string &row) {
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index f92300c..bd26607 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -24,18 +24,26 @@
 
 #include "connection/request.h"
 #include "connection/response.h"
+#include "core/response_converter.h"
 #include "if/Client.pb.h"
+#include "serde/region-info.h"
+#include "serde/server-name.h"
 #include "serde/table-name.h"
 
 using hbase::pb::TableName;
 using hbase::MetaUtil;
 using hbase::Request;
 using hbase::Response;
+using hbase::RegionLocation;
+using hbase::pb::RegionInfo;
 using hbase::pb::ScanRequest;
 using hbase::pb::ServerName;
 using hbase::pb::RegionSpecifier_RegionSpecifierType;
 
 static const std::string META_REGION = "1588230740";
+static const std::string CATALOG_FAMILY = "info";
+static const std::string REGION_INFO_COLUMN = "regioninfo";
+static const std::string SERVER_COLUMN = "server";
 
 std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const {
   return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
@@ -79,3 +87,29 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st
   scan->set_start_row(RegionLookupRowkey(tn, row));
   return request;
 }
+
+std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) {
+  std::vector<std::unique_ptr<Result>> results = ResponseConverter::FromScanResponse(resp);
+  if (results.size() != 1) {
+    throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" +
+                             std::to_string(results.size()));
+  }
+
+  auto result = *results[0];
+  // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO
+
+  std::shared_ptr<std::string> region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
+  std::shared_ptr<std::string> server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
+
+  if (region_info_str == nullptr) {
+    throw std::runtime_error("regioninfo column null for location");
+  }
+  if (server_str == nullptr) {
+    throw std::runtime_error("server column null for location");
+  }
+
+  auto row = result.Row();
+  auto region_info = folly::to<RegionInfo>(*region_info_str);
+  auto server_name = folly::to<ServerName>(*server_str);
+  return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr);
+}
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index 075215e..d67f32d 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -22,6 +22,8 @@
 #include <string>
 
 #include "connection/request.h"
+#include "connection/response.h"
+#include "core/region-location.h"
 #include "if/HBase.pb.h"
 #include "serde/table-name.h"
 
@@ -43,5 +45,10 @@ class MetaUtil {
    * location.
    */
   std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn, const std::string &row) const;
+
+  /**
+   * Return a RegionLocation from the parsed Response
+   */
+  std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
 };
 }  // namespace hbase
diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc
index 3fe2ba9..19a3554 100644
--- a/hbase-native-client/core/response_converter.cc
+++ b/hbase-native-client/core/response_converter.cc
@@ -22,9 +22,9 @@
 #include <vector>
 
 #include "core/cell.h"
-#include "if/Client.pb.h"
 
 using hbase::pb::GetResponse;
+using hbase::pb::ScanResponse;
 
 namespace hbase {
 
@@ -32,19 +32,65 @@ ResponseConverter::ResponseConverter() {}
 
 ResponseConverter::~ResponseConverter() {}
 
-std::unique_ptr<hbase::Result> ResponseConverter::FromGetResponse(const Response& resp) {
+std::unique_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) {
   auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg());
 
+  return ToResult(get_resp->result(), resp.cell_scanner());
+}
+
+std::unique_ptr<Result> ResponseConverter::ToResult(
+    const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
   std::vector<std::shared_ptr<Cell>> vcells;
-  for (auto cell : get_resp->result().cell()) {
+  for (auto cell : result.cell()) {
     std::shared_ptr<Cell> pcell =
         std::make_shared<Cell>(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(),
                                cell.value(), static_cast<hbase::CellType>(cell.cell_type()));
     vcells.push_back(pcell);
   }
 
-  return std::make_unique<hbase::Result>(vcells, get_resp->result().exists(),
-                                         get_resp->result().stale(), get_resp->result().partial());
+  // iterate over the cells coming from rpc codec
+  if (cell_scanner != nullptr) {
+    while (cell_scanner->Advance()) {
+      vcells.push_back(cell_scanner->Current());
+    }
+    // TODO: check associated cell count?
+  }
+  return std::make_unique<Result>(vcells, result.exists(), result.stale(), result.partial());
 }
 
+std::vector<std::unique_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
+  auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
+  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
+  int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
+                                                   : scan_resp->results_size();
+
+  std::vector<std::unique_ptr<Result>> results{static_cast<size_t>(num_results)};
+  for (int i = 0; i < num_results; i++) {
+    if (resp.cell_scanner() != nullptr) {
+      // Cells are out in cellblocks.  Group them up again as Results.  How many to read at a
+      // time will be found in getCellsLength -- length here is how many Cells in the i'th Result
+      int num_cells = scan_resp->cells_per_result(i);
+
+      std::vector<std::shared_ptr<Cell>> vcells;
+      while (resp.cell_scanner()->Advance()) {
+        vcells.push_back(resp.cell_scanner()->Current());
+      }
+      // TODO: check associated cell count?
+
+      if (vcells.size() != num_cells) {
+        std::string msg = "Results sent from server=" + std::to_string(num_results) +
+                          ". But only got " + std::to_string(i) +
+                          " results completely at client. Resetting the scanner to scan again.";
+        LOG(ERROR) << msg;
+        throw std::runtime_error(msg);
+      }
+      // TODO: handle partial results per Result by checking partial_flag_per_result
+      results[i] = std::make_unique<Result>(vcells, false, scan_resp->stale(), false);
+    } else {
+      results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
+    }
+  }
+
+  return results;
+}
 } /* namespace hbase */
diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h
index 86fb632..859644b 100644
--- a/hbase-native-client/core/response_converter.h
+++ b/hbase-native-client/core/response_converter.h
@@ -22,6 +22,8 @@
 #include <memory>
 #include "connection/response.h"
 #include "core/result.h"
+#include "if/Client.pb.h"
+#include "serde/cell-scanner.h"
 
 namespace hbase {
 
@@ -33,11 +35,16 @@ class ResponseConverter {
  public:
   ~ResponseConverter();
 
+  static std::unique_ptr<Result> ToResult(const hbase::pb::Result& result,
+                                          const std::unique_ptr<CellScanner>& cell_scanner);
+
   /**
    * @brief Returns a Result object created by PB Message in passed Response object.
    * @param resp - Response object having the PB message.
    */
-  static std::unique_ptr<hbase::Result> FromGetResponse(const Response &resp);
+  static std::unique_ptr<hbase::Result> FromGetResponse(const Response& resp);
+
+  static std::vector<std::unique_ptr<Result>> FromScanResponse(const Response& resp);
 
  private:
   // Constructor not required. We have all static methods to extract response from PB messages.
diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc
index d73a5b2..4db3fca 100644
--- a/hbase-native-client/core/result.cc
+++ b/hbase-native-client/core/result.cc
@@ -87,7 +87,7 @@ bool Result::IsEmpty() const { return cells_.empty(); }
 
 const std::string &Result::Row() const { return row_; }
 
-const int Result::Size() const { return cells_.size(); }
+int Result::Size() const { return cells_.size(); }
 
 const ResultMap &Result::Map() const { return result_map_; }
 
diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h
index cd41cf0..8ff4311 100644
--- a/hbase-native-client/core/result.h
+++ b/hbase-native-client/core/result.h
@@ -100,7 +100,7 @@ class Result {
   /**
    * @brief Returns the size of the underlying Cell vector
    */
-  const int Size() const;
+  int Size() const;
 
   /**
    * @brief Map of families to all versions of its qualifiers and values.
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 3cd0a93..4b1144c 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -31,6 +31,7 @@
 
 #include "connection/connection-pool.h"
 #include "core/client.h"
+#include "core/keyvalue-codec.h"
 #include "if/Client.pb.h"
 #include "if/ZooKeeper.pb.h"
 #include "serde/server-name.h"
@@ -43,6 +44,7 @@ using hbase::Configuration;
 using hbase::Response;
 using hbase::Request;
 using hbase::HBaseService;
+using hbase::KeyValueCodec;
 using hbase::LocationCache;
 using hbase::ConnectionPool;
 using hbase::ConnectionFactory;
@@ -88,7 +90,8 @@ int main(int argc, char *argv[]) {
   // Set up thread pools.
   auto cpu_pool = std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads);
   auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5);
-  auto cp = std::make_shared<ConnectionPool>(io_pool);
+  auto codec = std::make_shared<KeyValueCodec>();
+  auto cp = std::make_shared<ConnectionPool>(io_pool, codec);
 
   // Configuration
   auto conf = std::make_shared<Configuration>();
@@ -105,7 +108,7 @@ int main(int argc, char *argv[]) {
 
   auto num_puts = FLAGS_columns;
 
-  auto results = std::vector<Future<Response>>{};
+  auto results = std::vector<Future<std::unique_ptr<Response>>>{};
   auto col = uint64_t{0};
   for (; col < num_puts; col++) {
     results.push_back(
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 58125f9..4e30d4b 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -56,12 +56,12 @@ std::unique_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
   auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name());
   auto user = User::defaultUser();  // TODO: make User::current() similar to UserUtil
 
-  Future<Response> f =
+  Future<std::unique_ptr<Response>> f =
       rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
                              std::move(req), user, "ClientService");
   auto resp = f.get();
 
-  return hbase::ResponseConverter::FromGetResponse(resp);
+  return hbase::ResponseConverter::FromGetResponse(*resp);
 }
 
 void Table::Close() {
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index a30f904..054684d 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -30,12 +30,12 @@ using hbase::pb::RegionSpecifier;
 using hbase::pb::RegionSpecifier_RegionSpecifierType;
 
 TEST(TestRpcSerde, TestReturnFalseOnNullPtr) {
-  RpcSerde deser;
+  RpcSerde deser{nullptr};
   ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0);
 }
 
 TEST(TestRpcSerde, TestReturnFalseOnBadInput) {
-  RpcSerde deser;
+  RpcSerde deser{nullptr};
   auto buf = IOBuf::copyBuffer("test");
   GetRequest gr;
 
@@ -44,8 +44,8 @@ TEST(TestRpcSerde, TestReturnFalseOnBadInput) {
 
 TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) {
   GetRequest in;
-  RpcSerde ser;
-  RpcSerde deser;
+  RpcSerde ser{nullptr};
+  RpcSerde deser{nullptr};
 
   // fill up the GetRequest.
   in.mutable_region()->set_value("test_region_id");
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 2bd17fb..33c48f3 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -32,7 +32,7 @@ using namespace folly;
 using namespace folly::io;
 
 TEST(RpcSerdeTest, PreambleIncludesHBas) {
-  RpcSerde ser;
+  RpcSerde ser{nullptr};
   auto buf = ser.Preamble();
   const char *p = reinterpret_cast<const char *>(buf->data());
   // Take the first for chars and make sure they are the
@@ -43,14 +43,14 @@ TEST(RpcSerdeTest, PreambleIncludesHBas) {
 }
 
 TEST(RpcSerdeTest, PreambleIncludesVersion) {
-  RpcSerde ser;
+  RpcSerde ser{nullptr};
   auto buf = ser.Preamble();
   EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]);
   EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]);
 }
 
 TEST(RpcSerdeTest, TestHeaderLengthPrefixed) {
-  RpcSerde ser;
+  RpcSerde ser{nullptr};
   auto header = ser.Header("elliott");
 
   // The header should be prefixed by 4 bytes of length.
@@ -65,7 +65,7 @@ TEST(RpcSerdeTest, TestHeaderLengthPrefixed) {
 }
 
 TEST(RpcSerdeTest, TestHeaderDecode) {
-  RpcSerde ser;
+  RpcSerde ser{nullptr};
   auto buf = ser.Header("elliott");
   auto header_buf = buf->next();
   ConnectionHeader h;
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
index 3bdb489..d5bca62 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc.cc
@@ -83,7 +83,7 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
   return coded_stream.CurrentPosition();
 }
 
-RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {}
+RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : auth_type_(DEFAULT_AUTH_TYPE), codec_(codec) {}
 
 unique_ptr<IOBuf> RpcSerde::Preamble() {
   auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
@@ -110,6 +110,10 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) {
   // didn't.
   // TODO: send the service name and user from the RpcClient
   h.set_service_name(INTERFACE);
+
+  if (codec_ != nullptr) {
+    h.set_cell_block_codec_class(codec_->java_class_name());
+  }
   return PrependLength(SerializeMessage(h));
 }
 
@@ -128,6 +132,14 @@ unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const string &method
   return PrependLength(std::move(ser_header));
 }
 
+std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
+                                                         uint32_t offset, uint32_t length) {
+  if (codec_ == nullptr) {
+    return nullptr;
+  }
+  return codec_->CreateDecoder(std::move(buf), offset, length);
+}
+
 unique_ptr<IOBuf> RpcSerde::PrependLength(unique_ptr<IOBuf> msg) {
   // Java ints are 4 long. So create a buffer that large
   auto len_buf = IOBuf::create(4);
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h
index 7d060c7..c59f903 100644
--- a/hbase-native-client/serde/rpc.h
+++ b/hbase-native-client/serde/rpc.h
@@ -21,6 +21,9 @@
 #include <memory>
 #include <string>
 
+#include "serde/cell-scanner.h"
+#include "serde/codec.h"
+
 // Forward
 namespace folly {
 class IOBuf;
@@ -44,7 +47,7 @@ class RpcSerde {
   /**
    * Constructor assumes the default auth type.
    */
-  RpcSerde();
+  RpcSerde(std::shared_ptr<Codec> codec);
 
   /**
    * Destructor. This is provided just for testing purposes.
@@ -76,6 +79,13 @@ class RpcSerde {
   std::unique_ptr<folly::IOBuf> Header(const std::string &user);
 
   /**
+   * Take ownership of the passed buffer, and create a CellScanner using the
+   * Codec class to parse Cells out of the wire.
+   */
+  std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
+                                                 uint32_t length);
+
+  /**
    * Serialize a request message into a protobuf.
    * Request consists of:
    *
@@ -109,5 +119,6 @@ class RpcSerde {
  private:
   /* data */
   uint8_t auth_type_;
+  std::shared_ptr<Codec> codec_;
 };
 }  // namespace hbase