You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:44:58 UTC

[hbase] 10/133: HBASE-14855 Connect to regionserver

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6173b0b09716f37e04e702cdb9d899ff47cd71fd
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Mon Apr 4 13:52:05 2016 -0700

    HBASE-14855 Connect to regionserver
    
    Summary:
    * Client dispatcher to put request and response together with call id. Though right now this is a pretty empty version of that.
    * pipeline. The Serialization pipeline.
    * client-serilization-handler.h This does the work of sending the preamable, the connection header, and length prepending data. This will obviously need to be split up into h and cc files
    
    Test Plan: Use simple-client to connect to a standalone server
    
    Differential Revision: https://reviews.facebook.net/D56385
---
 hbase-native-client/core/BUCK                      |  30 +++++-
 hbase-native-client/core/client-dispatcher.cc      |  54 +++++++++++
 .../core/{client.h => client-dispatcher.h}         |  22 ++++-
 .../core/client-serialize-handler.cc               | 104 ++++++++++++++++++++
 .../core/{client.h => client-serialize-handler.h}  |  26 ++++-
 hbase-native-client/core/client.cc                 |  16 ++--
 hbase-native-client/core/client.h                  |  18 +++-
 hbase-native-client/core/connection-factory.cc     |  57 +++++++++++
 .../core/{client.h => connection-factory.h}        |  23 ++++-
 .../core/{client.h => get-request.cc}              |   9 +-
 .../core/{client.h => get-request.h}               |  17 +++-
 .../core/{client.h => get-result.cc}               |   9 +-
 .../core/{client.h => get-result.h}                |  14 ++-
 hbase-native-client/core/location-cache-test.cc    |  25 ++++-
 hbase-native-client/core/location-cache.cc         |  62 ++++++++++--
 hbase-native-client/core/location-cache.h          |  29 +++++-
 hbase-native-client/core/native-client-test-env.cc |   6 +-
 .../core/{client.cc => pipeline.cc}                |  34 +++----
 hbase-native-client/core/{client.h => pipeline.h}  |  16 +++-
 hbase-native-client/core/{client.h => request.h}   |  15 ++-
 hbase-native-client/core/{client.h => response.h}  |  16 +++-
 hbase-native-client/core/{client.h => service.h}   |  10 +-
 .../core/{client.cc => simple-client.cc}           |  31 ++++--
 .../core/simple-native-client-test.cc              |   5 +-
 .../core/{client.h => table-name.cc}               |   9 +-
 .../core/{client.h => table-name.h}                |  14 ++-
 hbase-native-client/if/BUCK                        |  24 ++++-
 hbase-native-client/third-party/BUCK               | 105 ++++++++++-----------
 28 files changed, 615 insertions(+), 185 deletions(-)

diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index d1e89d1..2b00d66 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -16,35 +16,52 @@
 # limitations under the License.
 
 cxx_library(name="core",
-            headers=[
+            exported_headers=[
                 "admin.h",
+                "client-dispatcher.h",
+                "client-serialize-handler.h",
                 "client.h",
+                "connection-factory.h",
                 "connection.h",
                 "connection_attr.h",
                 "delete.h",
+                "get-request.h",
+                "get-result.h",
                 "get.h",
                 "hbase_macros.h",
+                "location-cache.h",
                 "mutation.h",
+                "pipeline.h",
                 "put.h",
+                "request.h",
+                "response.h",
                 "scanner.h",
-                "location-cache.h",
+                "service.h",
+                "table-name.h",
             ],
             srcs=[
                 "admin.cc",
+                "client-dispatcher.cc",
+                "client-serialize-handler.cc",
                 "client.cc",
+                "connection-factory.cc",
                 "connection.cc",
+                "delete.cc",
+                "get-request.cc",
+                "get-result.cc",
                 "get.cc",
+                "location-cache.cc",
                 "mutation.cc",
+                "pipeline.cc",
                 "put.cc",
-                "delete.cc",
                 "scanner.cc",
-                "location-cache.cc",
+                "table-name.cc",
             ],
             deps=[
                 "//if:if",
-                "//third-party:zookeeper_mt",
                 "//third-party:folly",
                 "//third-party:wangle",
+                "//third-party:zookeeper_mt",
             ],
             visibility=[
                 'PUBLIC',
@@ -68,3 +85,6 @@ cxx_test(name="location-cache-test",
              ":core",
          ],
          run_test_separately=True, )
+cxx_binary(name="simple-client",
+           srcs=["simple-client.cc", ],
+           deps=[":core", ], )
diff --git a/hbase-native-client/core/client-dispatcher.cc b/hbase-native-client/core/client-dispatcher.cc
new file mode 100644
index 0000000..d356759
--- /dev/null
+++ b/hbase-native-client/core/client-dispatcher.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "core/client-dispatcher.h"
+
+using namespace folly;
+using namespace hbase;
+using namespace wangle;
+
+void ClientDispatcher::read(Context *ctx, Response in) {
+  auto call_id = in.call_id();
+  auto search = requests_.find(call_id);
+  CHECK(search != requests_.end());
+  auto p = std::move(search->second);
+  requests_.erase(call_id);
+
+  // TODO(eclark): check if the response
+  // is an exception. If it is then set that.
+  p.setValue(in);
+}
+
+Future<Response> ClientDispatcher::operator()(Request arg) {
+  auto call_id = ++current_call_id_;
+  arg.set_call_id(call_id);
+  auto &p = requests_[call_id];
+  auto f = p.getFuture();
+  p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
+    this->requests_.erase(call_id);
+  });
+  this->pipeline_->write(arg);
+
+  return f;
+}
+
+Future<Unit> ClientDispatcher::close() { return ClientDispatcherBase::close(); }
+
+Future<Unit> ClientDispatcher::close(Context *ctx) {
+  return ClientDispatcherBase::close(ctx);
+}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client-dispatcher.h
similarity index 56%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/client-dispatcher.h
index 35a3bd8..4b9d35a 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client-dispatcher.h
@@ -19,8 +19,24 @@
 
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <wangle/service/ClientDispatcher.h>
 
-#include "if/Cell.pb.h"
+#include "core/pipeline.h"
+#include "core/request.h"
+#include "core/response.h"
 
-class Client {};
+namespace hbase {
+class ClientDispatcher
+    : public wangle::ClientDispatcherBase<SerializePipeline, Request,
+                                          Response> {
+public:
+  void read(Context *ctx, Response in) override;
+  folly::Future<Response> operator()(Request arg) override;
+  folly::Future<folly::Unit> close(Context *ctx) override;
+  folly::Future<folly::Unit> close() override;
+
+private:
+  std::unordered_map<int32_t, folly::Promise<Response>> requests_;
+  uint32_t current_call_id_ = 1;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client-serialize-handler.cc b/hbase-native-client/core/client-serialize-handler.cc
new file mode 100644
index 0000000..cad1308
--- /dev/null
+++ b/hbase-native-client/core/client-serialize-handler.cc
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/client-serialize-handler.h"
+
+#include <string>
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t AUTH_TYPE = 80;
+
+// TODO(eclark): Make this actually do ANYTHING.
+void ClientSerializeHandler::read(Context *ctx, std::unique_ptr<IOBuf> msg) {
+  Response received;
+  ctx->fireRead(received);
+}
+
+Future<Unit> ClientSerializeHandler::write(Context *ctx, Request r) {
+  // Keep track of if we have sent the header.
+  if (need_send_header_) {
+    need_send_header_ = false;
+
+    // Should this be replacing the IOBuf rather than
+    // sending several different calls?
+    write_preamble(ctx);
+    write_header(ctx);
+  }
+
+  // Send out the actual request and not just a test string.
+  std::string out{"test"};
+  return ctx->fireWrite(prepend_length(IOBuf::copyBuffer(out)));
+}
+
+Future<Unit> ClientSerializeHandler::write_preamble(Context *ctx) {
+  auto magic = IOBuf::copyBuffer(PREAMBLE);
+  auto buf = IOBuf::create(2);
+  buf->append(2);
+  folly::io::RWPrivateCursor c(buf.get());
+
+  // Version
+  c.write(RPC_VERSION);
+  // Standard security aka Please don't lie to me.
+  c.write(AUTH_TYPE);
+  magic->appendChain(std::move(buf));
+  return ctx->fireWrite(std::move(magic));
+}
+
+Future<Unit> ClientSerializeHandler::write_header(Context *ctx) {
+  pb::ConnectionHeader h;
+
+  // TODO(eclark): Make this not a total lie.
+  h.mutable_user_info()->set_effective_user("elliott");
+  // The service name that we want to talk to.
+  //
+  // Right now we're completely ignoring the service interface.
+  // That may or may not be the correct thing to do.
+  // It worked for a while with the java client; until it
+  // didn't.
+  h.set_service_name(INTERFACE);
+  // TODO(eclark): Make this 1 copy.
+  auto msg = IOBuf::copyBuffer(h.SerializeAsString());
+  return ctx->fireWrite(prepend_length(std::move(msg)));
+}
+
+// Our own simple version of LengthFieldPrepender
+std::unique_ptr<IOBuf>
+ClientSerializeHandler::prepend_length(std::unique_ptr<IOBuf> msg) {
+  // Java ints are 4 long. So create a buffer that large
+  auto len_buf = IOBuf::create(4);
+  // Then make those bytes visible.
+  len_buf->append(4);
+
+  io::RWPrivateCursor c(len_buf.get());
+  // Get the size of the data to be pushed out the network.
+  auto size = msg->computeChainDataLength();
+
+  // Write the length to this IOBuf.
+  c.writeBE(static_cast<uint32_t>(size));
+
+  // Then attach the origional to the back of len_buf
+  len_buf->appendChain(std::move(msg));
+  return len_buf;
+}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client-serialize-handler.h
similarity index 50%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/client-serialize-handler.h
index 35a3bd8..961a03b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client-serialize-handler.h
@@ -16,11 +16,29 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <wangle/channel/Handler.h>
+
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+#include "core/request.h"
+#include "core/response.h"
 
-#include "if/Cell.pb.h"
+namespace hbase {
+class ClientSerializeHandler
+    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
+                             std::unique_ptr<folly::IOBuf>> {
+public:
+  void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
+  folly::Future<folly::Unit> write(Context *ctx, Request r) override;
 
-class Client {};
+private:
+  folly::Future<folly::Unit> write_preamble(Context *ctx);
+  folly::Future<folly::Unit> write_header(Context *ctx);
+  // Our own simple version of LengthFieldPrepender
+  std::unique_ptr<folly::IOBuf>
+  prepend_length(std::unique_ptr<folly::IOBuf> msg);
+  bool need_send_header_ = true;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index a04daee..893894f 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -23,20 +23,18 @@
 #include <folly/Random.h>
 #include <glog/logging.h>
 #include <gflags/gflags.h>
+#include <wangle/concurrent/GlobalExecutor.h>
+
+#include <string>
 
 #include "if/ZooKeeper.pb.h"
 
 using namespace folly;
+using namespace std;
 using namespace hbase::pb;
 
-int main(int argc, char *argv[]) {
-  MetaRegionServer mrs;
-  google::ParseCommandLineFlags(&argc, &argv, true);
-  google::InitGoogleLogging(argv[0]);
+namespace hbase {
 
-  FB_LOG_EVERY_MS(INFO, 10000) << "Hello";
-  for (long i = 0; i < 10000000; i++) {
-    FB_LOG_EVERY_MS(INFO, 1) << Random::rand32();
-  }
-  return 0;
+Client::Client(string quorum_spec)
+    : location_cache(quorum_spec, wangle::getCPUExecutor()) {}
 }
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 35a3bd8..818bc6b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -20,7 +20,23 @@
 #pragma once
 
 #include <folly/io/IOBuf.h>
+#include <folly/futures/Future.h>
 
+#include <string>
+
+#include "core/get-request.h"
+#include "core/get-result.h"
+#include "core/location-cache.h"
 #include "if/Cell.pb.h"
 
-class Client {};
+namespace hbase {
+class Client {
+public:
+  explicit Client(std::string quorum_spec);
+  folly::Future<GetResult> get(const GetRequest &get_request);
+
+private:
+  LocationCache location_cache;
+};
+
+}  // namespace hbase
diff --git a/hbase-native-client/core/connection-factory.cc b/hbase-native-client/core/connection-factory.cc
new file mode 100644
index 0000000..785b239
--- /dev/null
+++ b/hbase-native-client/core/connection-factory.cc
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/connection-factory.h"
+
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/channel/OutputBufferingHandler.h>
+#include <wangle/service/ClientDispatcher.h>
+#include <wangle/service/ExpiringFilter.h>
+#include <folly/futures/Future.h>
+
+#include <string>
+
+#include "core/client-dispatcher.h"
+#include "core/pipeline.h"
+#include "core/request.h"
+#include "core/response.h"
+#include "core/service.h"
+
+using namespace folly;
+using namespace hbase;
+using namespace wangle;
+
+ConnectionFactory::ConnectionFactory() {
+  bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(2));
+  bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
+}
+
+Future<ClientDispatcher> ConnectionFactory::make_connection(std::string host,
+                                                            int port) {
+  // Connect to a given server
+  // Then when connected create a ClientDispactcher.
+  auto srv = bootstrap_.connect(SocketAddress(host, port, true))
+                 .then([](SerializePipeline *pipeline) {
+                   ClientDispatcher dispatcher;
+                   dispatcher.setPipeline(pipeline);
+                   return dispatcher;
+                 });
+  return srv;
+}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/connection-factory.h
similarity index 62%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/connection-factory.h
index 35a3bd8..6f450c2 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/connection-factory.h
@@ -16,11 +16,26 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <wangle/bootstrap/ClientBootstrap.h>
+#include <wangle/service/Service.h>
+
+#include <string>
+
+#include "core/service.h"
+#include "core/pipeline.h"
+#include "core/client-dispatcher.h"
+#include "core/request.h"
+#include "core/response.h"
 
-#include "if/Cell.pb.h"
+namespace hbase {
+class ConnectionFactory {
+public:
+  ConnectionFactory();
+  folly::Future<ClientDispatcher> make_connection(std::string host, int port);
 
-class Client {};
+private:
+  wangle::ClientBootstrap<SerializePipeline> bootstrap_;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-request.cc
similarity index 90%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-request.cc
index 35a3bd8..e927ccc 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-request.cc
@@ -16,11 +16,4 @@
  * limitations under the License.
  *
  */
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
-
-class Client {};
+#include "core/get-request.h"
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-request.h
similarity index 78%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-request.h
index 35a3bd8..c9113ad 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-request.h
@@ -16,11 +16,20 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <string>
+
+#include "core/table-name.h"
+
+namespace hbase {
 
-#include "if/Cell.pb.h"
+class GetRequest {
+public:
+  GetRequest(TableName table_name, std::string key);
 
-class Client {};
+private:
+  TableName table_name_;
+  std::string key_;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-result.cc
similarity index 90%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-result.cc
index 35a3bd8..7eea483 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-result.cc
@@ -16,11 +16,4 @@
  * limitations under the License.
  *
  */
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
-
-class Client {};
+#include "core/get-result.h"
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-result.h
similarity index 84%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-result.h
index 35a3bd8..e021316 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-result.h
@@ -16,11 +16,17 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <string>
+
+namespace hbase {
 
-#include "if/Cell.pb.h"
+class GetResult {
+public:
+  explicit GetResult(std::string key);
 
-class Client {};
+private:
+  std::string key_;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 3106e36..70ca6f1 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 #include <gtest/gtest.h>
 #include <folly/Memory.h>
 #include <wangle/concurrent/GlobalExecutor.h>
@@ -8,7 +26,8 @@ using namespace hbase;
 TEST(LocationCacheTest, TestGetMetaNodeContents) {
   // TODO(elliott): need to make a test utility for this.
   LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
-  auto result = cache.LocateMeta();
-  result.wait();
-  ASSERT_FALSE(result.hasException());
+  auto f = cache.LocateMeta();
+  auto result = f.get();
+  ASSERT_FALSE(f.hasException());
+  ASSERT_TRUE(result.has_port());
 }
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index cf61e24..34e3236 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 #include "location-cache.h"
 
 #include <folly/Logging.h>
@@ -7,11 +25,10 @@
 using namespace std;
 using namespace folly;
 using namespace hbase::pb;
+using namespace hbase;
 
-namespace hbase {
-
-// TODO(elliott): make this configurable on client creation
-const static string META_LOCATION = "/hbase/meta-region-server";
+// TODO(eclark): make this configurable on client creation
+static const char META_LOCATION[] = "/hbase/meta-region-server";
 
 LocationCache::LocationCache(string quorum_spec,
                              shared_ptr<folly::Executor> executor)
@@ -50,18 +67,45 @@ void LocationCache::RefreshMetaLocation() {
 
 ServerName LocationCache::ReadMetaLocation() {
   char contents[4096];
+  // This needs to be int rather than size_t as that's what ZK expects.
   int len = sizeof(contents);
   // TODO(elliott): handle disconnects/reconntion as needed.
   int zk_result =
-      zoo_get(this->zk_, META_LOCATION.c_str(), 0, contents, &len, nullptr);
-
-  if (zk_result != ZOK) {
+      zoo_get(this->zk_, META_LOCATION, 0, contents, &len, nullptr);
+  if (zk_result != ZOK || len < 9) {
     LOG(ERROR) << "Error getting meta location.";
     throw runtime_error("Error getting meta location");
   }
+  // There should be a magic number for recoverable zk
+  if (static_cast<uint8_t>(contents[0]) != 255) {
+    LOG(ERROR) << "Magic number not in ZK znode data expected 255 got ="
+               << unsigned(static_cast<uint8_t>(contents[0]));
+    throw runtime_error("Magic number not in znode data");
+  }
+  // pos will keep track of skipped bytes.
+  int pos = 1;
+  // How long is the id?
+  int id_len = 0;
+  for (int i = 0; i < 4; ++i) {
+    id_len = id_len << 8;
+    id_len = id_len | static_cast<uint8_t>(contents[pos]);
+    ++pos;
+  }
+  // Skip the id
+  pos += id_len;
+  // Then all protobuf's for HBase are prefixed with a magic string.
+  // PBUF, so we skip that.
+  // TODO(eclark): check to make sure that the magic string is correct
+  // though I am not sure that will get us much.
+  pos += 4;
 
   MetaRegionServer mrs;
-  mrs.ParseFromArray(contents, len);
+  // Try to decode the protobuf.
+  // If there's an error bail out.
+  if (mrs.ParseFromArray(contents + pos, len - pos) == false) {
+    LOG(ERROR) << "Error parsing Protobuf Message";
+    throw runtime_error("Error parsing protobuf");
+  }
+
   return mrs.server();
 }
-}
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index 8dc2760..efcfde5 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -1,13 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 #pragma once
 
-#include <memory>
-#include <mutex>
-
 #include <zookeeper/zookeeper.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/SharedPromise.h>
-
 #include <folly/Executor.h>
+
+#include <memory>
+#include <mutex>
+#include <string>
+
 #include "if/HBase.pb.h"
 
 namespace hbase {
@@ -32,4 +51,4 @@ private:
 
   zhandle_t *zk_;
 };
-} // hbase
+}  // namespace hbase
diff --git a/hbase-native-client/core/native-client-test-env.cc b/hbase-native-client/core/native-client-test-env.cc
index 07f30a6..0269a43 100644
--- a/hbase-native-client/core/native-client-test-env.cc
+++ b/hbase-native-client/core/native-client-test-env.cc
@@ -22,7 +22,7 @@
 namespace {
 
 class NativeClientTestEnv : public ::testing::Environment {
- public:
+public:
   void SetUp() override {
     // start local HBase cluster to be reused by all tests
     auto result = system("bin/start_local_hbase_and_wait.sh");
@@ -36,9 +36,9 @@ class NativeClientTestEnv : public ::testing::Environment {
   }
 };
 
-}  // anonymous
+} // anonymous
 
-int main(int argc, char** argv) {
+int main(int argc, char **argv) {
   testing::InitGoogleTest(&argc, argv);
   ::testing::AddGlobalTestEnvironment(new NativeClientTestEnv());
   return RUN_ALL_TESTS();
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/pipeline.cc
similarity index 55%
copy from hbase-native-client/core/client.cc
copy to hbase-native-client/core/pipeline.cc
index a04daee..30d14ff 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/pipeline.cc
@@ -16,27 +16,27 @@
  * limitations under the License.
  *
  */
-
-#include "core/client.h"
+#include "core/pipeline.h"
 
 #include <folly/Logging.h>
-#include <folly/Random.h>
-#include <glog/logging.h>
-#include <gflags/gflags.h>
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/channel/OutputBufferingHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
 
-#include "if/ZooKeeper.pb.h"
+#include "core/client-serialize-handler.h"
 
 using namespace folly;
-using namespace hbase::pb;
-
-int main(int argc, char *argv[]) {
-  MetaRegionServer mrs;
-  google::ParseCommandLineFlags(&argc, &argv, true);
-  google::InitGoogleLogging(argv[0]);
+using namespace hbase;
+using namespace wangle;
 
-  FB_LOG_EVERY_MS(INFO, 10000) << "Hello";
-  for (long i = 0; i < 10000000; i++) {
-    FB_LOG_EVERY_MS(INFO, 1) << Random::rand32();
-  }
-  return 0;
+SerializePipeline::Ptr
+RpcPipelineFactory::newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
+  auto pipeline = SerializePipeline::create();
+  pipeline->addBack(AsyncSocketHandler(sock));
+  pipeline->addBack(EventBaseHandler());
+  pipeline->addBack(LengthFieldBasedFrameDecoder());
+  pipeline->addBack(ClientSerializeHandler());
+  pipeline->finalize();
+  return pipeline;
 }
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/pipeline.h
similarity index 65%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/pipeline.h
index 35a3bd8..d199d08 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/pipeline.h
@@ -16,11 +16,19 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <wangle/service/Service.h>
+#include <folly/io/IOBufQueue.h>
+#include "core/request.h"
+#include "core/response.h"
 
-#include "if/Cell.pb.h"
+namespace hbase {
+using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>;
 
-class Client {};
+class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
+public:
+  SerializePipeline::Ptr
+  newPipeline(std::shared_ptr<folly::AsyncTransportWrapper> sock) override;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/request.h
similarity index 76%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/request.h
index 35a3bd8..39083ed 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/request.h
@@ -16,11 +16,18 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <cstdint>
 
-#include "if/Cell.pb.h"
+namespace hbase {
+class Request {
+public:
+  Request() : call_id_(0) {}
+  uint32_t call_id() { return call_id_; }
+  void set_call_id(uint32_t call_id) { call_id_ = call_id; }
 
-class Client {};
+private:
+  uint32_t call_id_;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/response.h
similarity index 76%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/response.h
index 35a3bd8..34a284d 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/response.h
@@ -16,11 +16,19 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <cstdint>
+
+namespace hbase {
 
-#include "if/Cell.pb.h"
+class Response {
+public:
+  Response() : call_id_(0) {}
+  uint32_t call_id() { return call_id_; }
+  void set_call_id(uint32_t call_id) { call_id_ = call_id; }
 
-class Client {};
+private:
+  uint32_t call_id_;
+};
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/service.h
similarity index 84%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/service.h
index 35a3bd8..880e65f 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/service.h
@@ -16,11 +16,11 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
+#include "core/request.h"
+#include "core/response.h"
 
-class Client {};
+namespace hbase {
+using HBaseService = wangle::Service<Request, Response>;
+}  // namespace hbase
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/simple-client.cc
similarity index 67%
copy from hbase-native-client/core/client.cc
copy to hbase-native-client/core/simple-client.cc
index a04daee..08e886a 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -17,26 +17,41 @@
  *
  */
 
-#include "core/client.h"
-
 #include <folly/Logging.h>
 #include <folly/Random.h>
-#include <glog/logging.h>
 #include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <wangle/concurrent/GlobalExecutor.h>
+
+#include <iostream>
 
+#include "core/client.h"
+#include "core/connection-factory.h"
 #include "if/ZooKeeper.pb.h"
 
 using namespace folly;
+using namespace std;
+using namespace hbase;
 using namespace hbase::pb;
 
 int main(int argc, char *argv[]) {
-  MetaRegionServer mrs;
   google::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
 
-  FB_LOG_EVERY_MS(INFO, 10000) << "Hello";
-  for (long i = 0; i < 10000000; i++) {
-    FB_LOG_EVERY_MS(INFO, 1) << Random::rand32();
-  }
+  // Create a connection factory
+  ConnectionFactory cf;
+
+  LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
+
+  auto result = cache.LocateMeta().get();
+  cout << "ServerName = " << result.host_name() << ":" << result.port() << endl;
+
+  // Create a connection to the local host
+  auto conn = cf.make_connection(result.host_name(), result.port()).get();
+
+  // Send the request
+  Request r;
+  conn(r).get();
+
   return 0;
 }
diff --git a/hbase-native-client/core/simple-native-client-test.cc b/hbase-native-client/core/simple-native-client-test.cc
index ef564f7..ee39986 100644
--- a/hbase-native-client/core/simple-native-client-test.cc
+++ b/hbase-native-client/core/simple-native-client-test.cc
@@ -22,7 +22,4 @@
 /**
  * Sample test.
  */
-TEST(SampleTest, sample) {
-  EXPECT_TRUE(true);
-}
-
+TEST(SampleTest, sample) { EXPECT_TRUE(true); }
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/table-name.cc
similarity index 90%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/table-name.cc
index 35a3bd8..ffaaed0 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/table-name.cc
@@ -16,11 +16,4 @@
  * limitations under the License.
  *
  */
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
-
-class Client {};
+#include "core/table-name.h"
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/table-name.h
similarity index 75%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/table-name.h
index 35a3bd8..796115b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/table-name.h
@@ -16,11 +16,17 @@
  * limitations under the License.
  *
  */
-
 #pragma once
 
-#include <folly/io/IOBuf.h>
+#include <memory>
+#include <string>
 
-#include "if/Cell.pb.h"
+namespace hbase {
 
-class Client {};
+// This is the core class of a HBase client.
+class TableName {
+public:
+  explicit TableName(std::string tableName);
+  explicit TableName(std::string namespaceName, std::string tableName);
+};
+}  // namespace hbase
diff --git a/hbase-native-client/if/BUCK b/hbase-native-client/if/BUCK
index 9b989b5..5ff617d 100644
--- a/hbase-native-client/if/BUCK
+++ b/hbase-native-client/if/BUCK
@@ -1,3 +1,21 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 PROTO_SRCS = glob(['*.proto'])
 HEADER_FILENAMES = [ x.replace('.proto','.pb.h') for x in PROTO_SRCS]
 CC_FILENAMES = [ x.replace('.proto', '.pb.cc') for x in PROTO_SRCS]
@@ -20,7 +38,7 @@ for cc_filename in CC_FILENAMES:
   genrule(
     name = cc_filename,
     cmd = 'mkdir -p `dirname $OUT` '
-          ' && cp $(location :generate-proto-sources)/{} $OUT '
+          ' && cp $(location :generate-proto-sources)/*.cc `dirname $OUT` '
           ' && cp $(location :generate-proto-sources)/*.h `dirname $OUT`'.format(cc_filename),
     out = cc_filename,
   )
@@ -29,9 +47,7 @@ cxx_library(
   name = 'if',
   exported_headers =  [':' + x for x in HEADER_FILENAMES],
   srcs = [':' + x for x in CC_FILENAMES],
-  deps = [ '//third-party:protobuf'] 
-        + [':' + x for x in CC_FILENAMES] 
-        + [ ':' + x for x in HEADER_FILENAMES ],
+  deps = [ '//third-party:protobuf'],
   visibility = [ 'PUBLIC', ],
   exported_deps = ['//third-party:protobuf']
 )
diff --git a/hbase-native-client/third-party/BUCK b/hbase-native-client/third-party/BUCK
index 6548695..4327530 100644
--- a/hbase-native-client/third-party/BUCK
+++ b/hbase-native-client/third-party/BUCK
@@ -23,19 +23,30 @@ def add_system_libs(names=[],
                     exported_linker_flags=[]):
     rules = []
     for name in names:
+        rule_visibility = ['PUBLIC']
         gen_rule_name = "gen_lib{}".format(name)
-        genrule(name=gen_rule_name,
-                out=gen_rule_name,
-                bash="mkdir -p $OUT && cp {}/lib{}.a $OUT".format(lib_dir,
-                                                                   name), )
+        genrule(
+            name=gen_rule_name,
+            out=gen_rule_name,
+            bash="mkdir -p $OUT && cp {}/lib{}.a $OUT".format(lib_dir, name), )
         prebuilt_cxx_library(name=name,
                              lib_name=name,
                              lib_dir='$(location :{})'.format(gen_rule_name),
                              deps=deps,
-			     force_static = True,
+                             force_static=True,
                              exported_deps=exported_deps,
-                             visibility=['PUBLIC'],
-                             exported_linker_flags=exported_linker_flags, )
+                             visibility=rule_visibility, )
+        rules.append(":" + name)
+    return rules
+
+
+def add_dynamic_libs(names=[]):
+    rules = []
+    for name in names:
+        prebuilt_cxx_library(name=name,
+                             header_only=True,
+                             exported_linker_flags=["-l" + name],
+                             visibility=["PUBLIC"], )
         rules.append(":" + name)
     return rules
 
@@ -54,58 +65,46 @@ local_libs = [
     "glog",
     "protobuf",
 ]
+dynamic_libs = ["stdc++", "pthread", "ssl", "crypto", "dl", "atomic", ]
+dynamic_rules = add_dynamic_libs(dynamic_libs)
+tp_dep_rules =  add_system_libs(system_libs,) \
+  + add_system_libs(local_libs, lib_dir = "/usr/local/lib") \
+  + dynamic_rules
 
-
-
-tp_dep_rules = add_system_libs(system_libs) \
-  + add_system_libs(local_libs, lib_dir = "/usr/local/lib")
-
-zookeeper = add_system_libs(["zookeeper_mt"], lib_dir =  "/usr/local/lib")
+zookeeper = add_system_libs(["zookeeper_mt"], lib_dir="/usr/local/lib")
 folly = add_system_libs(['folly'],
                         lib_dir='/usr/local/lib',
-                        exported_deps=tp_dep_rules,
-                        exported_linker_flags=["-pthread",
-                                               "-lstdc++", ])
+                        exported_deps=tp_dep_rules, )
 folly_bench = add_system_libs(['follybenchmark'],
                               lib_dir='/usr/local/lib',
-                              exported_deps=tp_dep_rules + folly,
-                              exported_linker_flags=["-pthread",
-                                                     "-lstdc++", ])
+                              exported_deps=folly + tp_dep_rules, )
 wangle = add_system_libs(['wangle'],
                          lib_dir='/usr/local/lib',
-                         exported_deps=tp_dep_rules + folly,
-                         exported_linker_flags=["-pthread",
-                                                "-lstdc++", ])
-
+                         exported_deps=folly + tp_dep_rules)
 
 genrule(
-name = "gen_zk",
-out = "gen_zk",
-bash = "mkdir -p $OUT  && wget http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz &&   tar zxf zookeeper-3.4.8.tar.gz &&   rm -rf zookeeper-3.4.8.tar.gz &&   cd zookeeper-3.4.8 &&   cd src/c && ./configure --prefix=$OUT &&   make &&   make install && cd $OUT && rm -rf zookeeper-3.4.8*"
-)
-cxx_library(
-    name = 'google-test',
-    srcs = [
-    'googletest/googletest/src/gtest-all.cc',
-    'googletest/googlemock/src/gmock-all.cc',
-    'googletest/googlemock/src/gmock_main.cc',
-    ],
-    header_namespace = '',
-    exported_headers = subdir_glob([
-    ('googletest/googletest/include', '**/*.h'),
-    ('googletest/googlemock/include', '**/*.h'),
-    ]),
-    headers = subdir_glob([
-    ('googletest/googletest', 'src/*.h'),
-    ('googletest/googletest', 'src/*.cc'),
-    ('googletest/googlemock', 'src/*.h'),
-    ('googletest/googlemock', 'src/*.cc'),
-    ]),
-    exported_linker_flags = [
-    "-pthread",
-    "-lstdc++",
-    ],
-    visibility = [
-    'PUBLIC',
-    ],
-)
+    name="gen_zk",
+    out="gen_zk",
+    bash=
+    "mkdir -p $OUT  && wget http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz &&   tar zxf zookeeper-3.4.8.tar.gz &&   rm -rf zookeeper-3.4.8.tar.gz &&   cd zookeeper-3.4.8 &&   cd src/c && ./configure --prefix=$OUT &&   make &&   make install && cd $OUT && rm -rf zookeeper-3.4.8*")
+cxx_library(name='google-test',
+            srcs=[
+                'googletest/googletest/src/gtest-all.cc',
+                'googletest/googlemock/src/gmock-all.cc',
+                'googletest/googlemock/src/gmock_main.cc',
+            ],
+            header_namespace='',
+            exported_headers=subdir_glob([
+                ('googletest/googletest/include', '**/*.h'),
+                ('googletest/googlemock/include', '**/*.h'),
+            ]),
+            headers=subdir_glob([
+                ('googletest/googletest', 'src/*.h'),
+                ('googletest/googletest', 'src/*.cc'),
+                ('googletest/googlemock', 'src/*.h'),
+                ('googletest/googlemock', 'src/*.cc'),
+            ]),
+            exported_deps=dynamic_rules,
+            visibility=[
+                'PUBLIC',
+            ], )