You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:44:58 UTC
[hbase] 10/133: HBASE-14855 Connect to regionserver
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6173b0b09716f37e04e702cdb9d899ff47cd71fd
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Mon Apr 4 13:52:05 2016 -0700
HBASE-14855 Connect to regionserver
Summary:
* Client dispatcher to put request and response together with call id. Though right now this is a pretty empty version of that.
* pipeline. The Serialization pipeline.
* client-serilization-handler.h This does the work of sending the preamable, the connection header, and length prepending data. This will obviously need to be split up into h and cc files
Test Plan: Use simple-client to connect to a standalone server
Differential Revision: https://reviews.facebook.net/D56385
---
hbase-native-client/core/BUCK | 30 +++++-
hbase-native-client/core/client-dispatcher.cc | 54 +++++++++++
.../core/{client.h => client-dispatcher.h} | 22 ++++-
.../core/client-serialize-handler.cc | 104 ++++++++++++++++++++
.../core/{client.h => client-serialize-handler.h} | 26 ++++-
hbase-native-client/core/client.cc | 16 ++--
hbase-native-client/core/client.h | 18 +++-
hbase-native-client/core/connection-factory.cc | 57 +++++++++++
.../core/{client.h => connection-factory.h} | 23 ++++-
.../core/{client.h => get-request.cc} | 9 +-
.../core/{client.h => get-request.h} | 17 +++-
.../core/{client.h => get-result.cc} | 9 +-
.../core/{client.h => get-result.h} | 14 ++-
hbase-native-client/core/location-cache-test.cc | 25 ++++-
hbase-native-client/core/location-cache.cc | 62 ++++++++++--
hbase-native-client/core/location-cache.h | 29 +++++-
hbase-native-client/core/native-client-test-env.cc | 6 +-
.../core/{client.cc => pipeline.cc} | 34 +++----
hbase-native-client/core/{client.h => pipeline.h} | 16 +++-
hbase-native-client/core/{client.h => request.h} | 15 ++-
hbase-native-client/core/{client.h => response.h} | 16 +++-
hbase-native-client/core/{client.h => service.h} | 10 +-
.../core/{client.cc => simple-client.cc} | 31 ++++--
.../core/simple-native-client-test.cc | 5 +-
.../core/{client.h => table-name.cc} | 9 +-
.../core/{client.h => table-name.h} | 14 ++-
hbase-native-client/if/BUCK | 24 ++++-
hbase-native-client/third-party/BUCK | 105 ++++++++++-----------
28 files changed, 615 insertions(+), 185 deletions(-)
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index d1e89d1..2b00d66 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -16,35 +16,52 @@
# limitations under the License.
cxx_library(name="core",
- headers=[
+ exported_headers=[
"admin.h",
+ "client-dispatcher.h",
+ "client-serialize-handler.h",
"client.h",
+ "connection-factory.h",
"connection.h",
"connection_attr.h",
"delete.h",
+ "get-request.h",
+ "get-result.h",
"get.h",
"hbase_macros.h",
+ "location-cache.h",
"mutation.h",
+ "pipeline.h",
"put.h",
+ "request.h",
+ "response.h",
"scanner.h",
- "location-cache.h",
+ "service.h",
+ "table-name.h",
],
srcs=[
"admin.cc",
+ "client-dispatcher.cc",
+ "client-serialize-handler.cc",
"client.cc",
+ "connection-factory.cc",
"connection.cc",
+ "delete.cc",
+ "get-request.cc",
+ "get-result.cc",
"get.cc",
+ "location-cache.cc",
"mutation.cc",
+ "pipeline.cc",
"put.cc",
- "delete.cc",
"scanner.cc",
- "location-cache.cc",
+ "table-name.cc",
],
deps=[
"//if:if",
- "//third-party:zookeeper_mt",
"//third-party:folly",
"//third-party:wangle",
+ "//third-party:zookeeper_mt",
],
visibility=[
'PUBLIC',
@@ -68,3 +85,6 @@ cxx_test(name="location-cache-test",
":core",
],
run_test_separately=True, )
+cxx_binary(name="simple-client",
+ srcs=["simple-client.cc", ],
+ deps=[":core", ], )
diff --git a/hbase-native-client/core/client-dispatcher.cc b/hbase-native-client/core/client-dispatcher.cc
new file mode 100644
index 0000000..d356759
--- /dev/null
+++ b/hbase-native-client/core/client-dispatcher.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "core/client-dispatcher.h"
+
+using namespace folly;
+using namespace hbase;
+using namespace wangle;
+
+void ClientDispatcher::read(Context *ctx, Response in) {
+ auto call_id = in.call_id();
+ auto search = requests_.find(call_id);
+ CHECK(search != requests_.end());
+ auto p = std::move(search->second);
+ requests_.erase(call_id);
+
+ // TODO(eclark): check if the response
+ // is an exception. If it is then set that.
+ p.setValue(in);
+}
+
+Future<Response> ClientDispatcher::operator()(Request arg) {
+ auto call_id = ++current_call_id_;
+ arg.set_call_id(call_id);
+ auto &p = requests_[call_id];
+ auto f = p.getFuture();
+ p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
+ this->requests_.erase(call_id);
+ });
+ this->pipeline_->write(arg);
+
+ return f;
+}
+
+Future<Unit> ClientDispatcher::close() { return ClientDispatcherBase::close(); }
+
+Future<Unit> ClientDispatcher::close(Context *ctx) {
+ return ClientDispatcherBase::close(ctx);
+}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client-dispatcher.h
similarity index 56%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/client-dispatcher.h
index 35a3bd8..4b9d35a 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client-dispatcher.h
@@ -19,8 +19,24 @@
#pragma once
-#include <folly/io/IOBuf.h>
+#include <wangle/service/ClientDispatcher.h>
-#include "if/Cell.pb.h"
+#include "core/pipeline.h"
+#include "core/request.h"
+#include "core/response.h"
-class Client {};
+namespace hbase {
+class ClientDispatcher
+ : public wangle::ClientDispatcherBase<SerializePipeline, Request,
+ Response> {
+public:
+ void read(Context *ctx, Response in) override;
+ folly::Future<Response> operator()(Request arg) override;
+ folly::Future<folly::Unit> close(Context *ctx) override;
+ folly::Future<folly::Unit> close() override;
+
+private:
+ std::unordered_map<int32_t, folly::Promise<Response>> requests_;
+ uint32_t current_call_id_ = 1;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client-serialize-handler.cc b/hbase-native-client/core/client-serialize-handler.cc
new file mode 100644
index 0000000..cad1308
--- /dev/null
+++ b/hbase-native-client/core/client-serialize-handler.cc
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/client-serialize-handler.h"
+
+#include <string>
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t AUTH_TYPE = 80;
+
+// TODO(eclark): Make this actually do ANYTHING.
+void ClientSerializeHandler::read(Context *ctx, std::unique_ptr<IOBuf> msg) {
+ Response received;
+ ctx->fireRead(received);
+}
+
+Future<Unit> ClientSerializeHandler::write(Context *ctx, Request r) {
+ // Keep track of if we have sent the header.
+ if (need_send_header_) {
+ need_send_header_ = false;
+
+ // Should this be replacing the IOBuf rather than
+ // sending several different calls?
+ write_preamble(ctx);
+ write_header(ctx);
+ }
+
+ // Send out the actual request and not just a test string.
+ std::string out{"test"};
+ return ctx->fireWrite(prepend_length(IOBuf::copyBuffer(out)));
+}
+
+Future<Unit> ClientSerializeHandler::write_preamble(Context *ctx) {
+ auto magic = IOBuf::copyBuffer(PREAMBLE);
+ auto buf = IOBuf::create(2);
+ buf->append(2);
+ folly::io::RWPrivateCursor c(buf.get());
+
+ // Version
+ c.write(RPC_VERSION);
+ // Standard security aka Please don't lie to me.
+ c.write(AUTH_TYPE);
+ magic->appendChain(std::move(buf));
+ return ctx->fireWrite(std::move(magic));
+}
+
+Future<Unit> ClientSerializeHandler::write_header(Context *ctx) {
+ pb::ConnectionHeader h;
+
+ // TODO(eclark): Make this not a total lie.
+ h.mutable_user_info()->set_effective_user("elliott");
+ // The service name that we want to talk to.
+ //
+ // Right now we're completely ignoring the service interface.
+ // That may or may not be the correct thing to do.
+ // It worked for a while with the java client; until it
+ // didn't.
+ h.set_service_name(INTERFACE);
+ // TODO(eclark): Make this 1 copy.
+ auto msg = IOBuf::copyBuffer(h.SerializeAsString());
+ return ctx->fireWrite(prepend_length(std::move(msg)));
+}
+
+// Our own simple version of LengthFieldPrepender
+std::unique_ptr<IOBuf>
+ClientSerializeHandler::prepend_length(std::unique_ptr<IOBuf> msg) {
+ // Java ints are 4 long. So create a buffer that large
+ auto len_buf = IOBuf::create(4);
+ // Then make those bytes visible.
+ len_buf->append(4);
+
+ io::RWPrivateCursor c(len_buf.get());
+ // Get the size of the data to be pushed out the network.
+ auto size = msg->computeChainDataLength();
+
+ // Write the length to this IOBuf.
+ c.writeBE(static_cast<uint32_t>(size));
+
+ // Then attach the origional to the back of len_buf
+ len_buf->appendChain(std::move(msg));
+ return len_buf;
+}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client-serialize-handler.h
similarity index 50%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/client-serialize-handler.h
index 35a3bd8..961a03b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client-serialize-handler.h
@@ -16,11 +16,29 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <wangle/channel/Handler.h>
+
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+#include "core/request.h"
+#include "core/response.h"
-#include "if/Cell.pb.h"
+namespace hbase {
+class ClientSerializeHandler
+ : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
+ std::unique_ptr<folly::IOBuf>> {
+public:
+ void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
+ folly::Future<folly::Unit> write(Context *ctx, Request r) override;
-class Client {};
+private:
+ folly::Future<folly::Unit> write_preamble(Context *ctx);
+ folly::Future<folly::Unit> write_header(Context *ctx);
+ // Our own simple version of LengthFieldPrepender
+ std::unique_ptr<folly::IOBuf>
+ prepend_length(std::unique_ptr<folly::IOBuf> msg);
+ bool need_send_header_ = true;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index a04daee..893894f 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -23,20 +23,18 @@
#include <folly/Random.h>
#include <glog/logging.h>
#include <gflags/gflags.h>
+#include <wangle/concurrent/GlobalExecutor.h>
+
+#include <string>
#include "if/ZooKeeper.pb.h"
using namespace folly;
+using namespace std;
using namespace hbase::pb;
-int main(int argc, char *argv[]) {
- MetaRegionServer mrs;
- google::ParseCommandLineFlags(&argc, &argv, true);
- google::InitGoogleLogging(argv[0]);
+namespace hbase {
- FB_LOG_EVERY_MS(INFO, 10000) << "Hello";
- for (long i = 0; i < 10000000; i++) {
- FB_LOG_EVERY_MS(INFO, 1) << Random::rand32();
- }
- return 0;
+Client::Client(string quorum_spec)
+ : location_cache(quorum_spec, wangle::getCPUExecutor()) {}
}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 35a3bd8..818bc6b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -20,7 +20,23 @@
#pragma once
#include <folly/io/IOBuf.h>
+#include <folly/futures/Future.h>
+#include <string>
+
+#include "core/get-request.h"
+#include "core/get-result.h"
+#include "core/location-cache.h"
#include "if/Cell.pb.h"
-class Client {};
+namespace hbase {
+class Client {
+public:
+ explicit Client(std::string quorum_spec);
+ folly::Future<GetResult> get(const GetRequest &get_request);
+
+private:
+ LocationCache location_cache;
+};
+
+} // namespace hbase
diff --git a/hbase-native-client/core/connection-factory.cc b/hbase-native-client/core/connection-factory.cc
new file mode 100644
index 0000000..785b239
--- /dev/null
+++ b/hbase-native-client/core/connection-factory.cc
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/connection-factory.h"
+
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/channel/OutputBufferingHandler.h>
+#include <wangle/service/ClientDispatcher.h>
+#include <wangle/service/ExpiringFilter.h>
+#include <folly/futures/Future.h>
+
+#include <string>
+
+#include "core/client-dispatcher.h"
+#include "core/pipeline.h"
+#include "core/request.h"
+#include "core/response.h"
+#include "core/service.h"
+
+using namespace folly;
+using namespace hbase;
+using namespace wangle;
+
+ConnectionFactory::ConnectionFactory() {
+ bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(2));
+ bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
+}
+
+Future<ClientDispatcher> ConnectionFactory::make_connection(std::string host,
+ int port) {
+ // Connect to a given server
+ // Then when connected create a ClientDispactcher.
+ auto srv = bootstrap_.connect(SocketAddress(host, port, true))
+ .then([](SerializePipeline *pipeline) {
+ ClientDispatcher dispatcher;
+ dispatcher.setPipeline(pipeline);
+ return dispatcher;
+ });
+ return srv;
+}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/connection-factory.h
similarity index 62%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/connection-factory.h
index 35a3bd8..6f450c2 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/connection-factory.h
@@ -16,11 +16,26 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <wangle/bootstrap/ClientBootstrap.h>
+#include <wangle/service/Service.h>
+
+#include <string>
+
+#include "core/service.h"
+#include "core/pipeline.h"
+#include "core/client-dispatcher.h"
+#include "core/request.h"
+#include "core/response.h"
-#include "if/Cell.pb.h"
+namespace hbase {
+class ConnectionFactory {
+public:
+ ConnectionFactory();
+ folly::Future<ClientDispatcher> make_connection(std::string host, int port);
-class Client {};
+private:
+ wangle::ClientBootstrap<SerializePipeline> bootstrap_;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-request.cc
similarity index 90%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-request.cc
index 35a3bd8..e927ccc 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-request.cc
@@ -16,11 +16,4 @@
* limitations under the License.
*
*/
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
-
-class Client {};
+#include "core/get-request.h"
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-request.h
similarity index 78%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-request.h
index 35a3bd8..c9113ad 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-request.h
@@ -16,11 +16,20 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <string>
+
+#include "core/table-name.h"
+
+namespace hbase {
-#include "if/Cell.pb.h"
+class GetRequest {
+public:
+ GetRequest(TableName table_name, std::string key);
-class Client {};
+private:
+ TableName table_name_;
+ std::string key_;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-result.cc
similarity index 90%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-result.cc
index 35a3bd8..7eea483 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-result.cc
@@ -16,11 +16,4 @@
* limitations under the License.
*
*/
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
-
-class Client {};
+#include "core/get-result.h"
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/get-result.h
similarity index 84%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/get-result.h
index 35a3bd8..e021316 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/get-result.h
@@ -16,11 +16,17 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <string>
+
+namespace hbase {
-#include "if/Cell.pb.h"
+class GetResult {
+public:
+ explicit GetResult(std::string key);
-class Client {};
+private:
+ std::string key_;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 3106e36..70ca6f1 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
#include <gtest/gtest.h>
#include <folly/Memory.h>
#include <wangle/concurrent/GlobalExecutor.h>
@@ -8,7 +26,8 @@ using namespace hbase;
TEST(LocationCacheTest, TestGetMetaNodeContents) {
// TODO(elliott): need to make a test utility for this.
LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
- auto result = cache.LocateMeta();
- result.wait();
- ASSERT_FALSE(result.hasException());
+ auto f = cache.LocateMeta();
+ auto result = f.get();
+ ASSERT_FALSE(f.hasException());
+ ASSERT_TRUE(result.has_port());
}
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index cf61e24..34e3236 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
#include "location-cache.h"
#include <folly/Logging.h>
@@ -7,11 +25,10 @@
using namespace std;
using namespace folly;
using namespace hbase::pb;
+using namespace hbase;
-namespace hbase {
-
-// TODO(elliott): make this configurable on client creation
-const static string META_LOCATION = "/hbase/meta-region-server";
+// TODO(eclark): make this configurable on client creation
+static const char META_LOCATION[] = "/hbase/meta-region-server";
LocationCache::LocationCache(string quorum_spec,
shared_ptr<folly::Executor> executor)
@@ -50,18 +67,45 @@ void LocationCache::RefreshMetaLocation() {
ServerName LocationCache::ReadMetaLocation() {
char contents[4096];
+ // This needs to be int rather than size_t as that's what ZK expects.
int len = sizeof(contents);
// TODO(elliott): handle disconnects/reconntion as needed.
int zk_result =
- zoo_get(this->zk_, META_LOCATION.c_str(), 0, contents, &len, nullptr);
-
- if (zk_result != ZOK) {
+ zoo_get(this->zk_, META_LOCATION, 0, contents, &len, nullptr);
+ if (zk_result != ZOK || len < 9) {
LOG(ERROR) << "Error getting meta location.";
throw runtime_error("Error getting meta location");
}
+ // There should be a magic number for recoverable zk
+ if (static_cast<uint8_t>(contents[0]) != 255) {
+ LOG(ERROR) << "Magic number not in ZK znode data expected 255 got ="
+ << unsigned(static_cast<uint8_t>(contents[0]));
+ throw runtime_error("Magic number not in znode data");
+ }
+ // pos will keep track of skipped bytes.
+ int pos = 1;
+ // How long is the id?
+ int id_len = 0;
+ for (int i = 0; i < 4; ++i) {
+ id_len = id_len << 8;
+ id_len = id_len | static_cast<uint8_t>(contents[pos]);
+ ++pos;
+ }
+ // Skip the id
+ pos += id_len;
+ // Then all protobuf's for HBase are prefixed with a magic string.
+ // PBUF, so we skip that.
+ // TODO(eclark): check to make sure that the magic string is correct
+ // though I am not sure that will get us much.
+ pos += 4;
MetaRegionServer mrs;
- mrs.ParseFromArray(contents, len);
+ // Try to decode the protobuf.
+ // If there's an error bail out.
+ if (mrs.ParseFromArray(contents + pos, len - pos) == false) {
+ LOG(ERROR) << "Error parsing Protobuf Message";
+ throw runtime_error("Error parsing protobuf");
+ }
+
return mrs.server();
}
-}
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index 8dc2760..efcfde5 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -1,13 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
#pragma once
-#include <memory>
-#include <mutex>
-
#include <zookeeper/zookeeper.h>
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
-
#include <folly/Executor.h>
+
+#include <memory>
+#include <mutex>
+#include <string>
+
#include "if/HBase.pb.h"
namespace hbase {
@@ -32,4 +51,4 @@ private:
zhandle_t *zk_;
};
-} // hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/native-client-test-env.cc b/hbase-native-client/core/native-client-test-env.cc
index 07f30a6..0269a43 100644
--- a/hbase-native-client/core/native-client-test-env.cc
+++ b/hbase-native-client/core/native-client-test-env.cc
@@ -22,7 +22,7 @@
namespace {
class NativeClientTestEnv : public ::testing::Environment {
- public:
+public:
void SetUp() override {
// start local HBase cluster to be reused by all tests
auto result = system("bin/start_local_hbase_and_wait.sh");
@@ -36,9 +36,9 @@ class NativeClientTestEnv : public ::testing::Environment {
}
};
-} // anonymous
+} // anonymous
-int main(int argc, char** argv) {
+int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
::testing::AddGlobalTestEnvironment(new NativeClientTestEnv());
return RUN_ALL_TESTS();
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/pipeline.cc
similarity index 55%
copy from hbase-native-client/core/client.cc
copy to hbase-native-client/core/pipeline.cc
index a04daee..30d14ff 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/pipeline.cc
@@ -16,27 +16,27 @@
* limitations under the License.
*
*/
-
-#include "core/client.h"
+#include "core/pipeline.h"
#include <folly/Logging.h>
-#include <folly/Random.h>
-#include <glog/logging.h>
-#include <gflags/gflags.h>
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/channel/OutputBufferingHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
-#include "if/ZooKeeper.pb.h"
+#include "core/client-serialize-handler.h"
using namespace folly;
-using namespace hbase::pb;
-
-int main(int argc, char *argv[]) {
- MetaRegionServer mrs;
- google::ParseCommandLineFlags(&argc, &argv, true);
- google::InitGoogleLogging(argv[0]);
+using namespace hbase;
+using namespace wangle;
- FB_LOG_EVERY_MS(INFO, 10000) << "Hello";
- for (long i = 0; i < 10000000; i++) {
- FB_LOG_EVERY_MS(INFO, 1) << Random::rand32();
- }
- return 0;
+SerializePipeline::Ptr
+RpcPipelineFactory::newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
+ auto pipeline = SerializePipeline::create();
+ pipeline->addBack(AsyncSocketHandler(sock));
+ pipeline->addBack(EventBaseHandler());
+ pipeline->addBack(LengthFieldBasedFrameDecoder());
+ pipeline->addBack(ClientSerializeHandler());
+ pipeline->finalize();
+ return pipeline;
}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/pipeline.h
similarity index 65%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/pipeline.h
index 35a3bd8..d199d08 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/pipeline.h
@@ -16,11 +16,19 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <wangle/service/Service.h>
+#include <folly/io/IOBufQueue.h>
+#include "core/request.h"
+#include "core/response.h"
-#include "if/Cell.pb.h"
+namespace hbase {
+using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>;
-class Client {};
+class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
+public:
+ SerializePipeline::Ptr
+ newPipeline(std::shared_ptr<folly::AsyncTransportWrapper> sock) override;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/request.h
similarity index 76%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/request.h
index 35a3bd8..39083ed 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/request.h
@@ -16,11 +16,18 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <cstdint>
-#include "if/Cell.pb.h"
+namespace hbase {
+class Request {
+public:
+ Request() : call_id_(0) {}
+ uint32_t call_id() { return call_id_; }
+ void set_call_id(uint32_t call_id) { call_id_ = call_id; }
-class Client {};
+private:
+ uint32_t call_id_;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/response.h
similarity index 76%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/response.h
index 35a3bd8..34a284d 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/response.h
@@ -16,11 +16,19 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <cstdint>
+
+namespace hbase {
-#include "if/Cell.pb.h"
+class Response {
+public:
+ Response() : call_id_(0) {}
+ uint32_t call_id() { return call_id_; }
+ void set_call_id(uint32_t call_id) { call_id_ = call_id; }
-class Client {};
+private:
+ uint32_t call_id_;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/service.h
similarity index 84%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/service.h
index 35a3bd8..880e65f 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/service.h
@@ -16,11 +16,11 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
+#include "core/request.h"
+#include "core/response.h"
-class Client {};
+namespace hbase {
+using HBaseService = wangle::Service<Request, Response>;
+} // namespace hbase
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/simple-client.cc
similarity index 67%
copy from hbase-native-client/core/client.cc
copy to hbase-native-client/core/simple-client.cc
index a04daee..08e886a 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -17,26 +17,41 @@
*
*/
-#include "core/client.h"
-
#include <folly/Logging.h>
#include <folly/Random.h>
-#include <glog/logging.h>
#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <wangle/concurrent/GlobalExecutor.h>
+
+#include <iostream>
+#include "core/client.h"
+#include "core/connection-factory.h"
#include "if/ZooKeeper.pb.h"
using namespace folly;
+using namespace std;
+using namespace hbase;
using namespace hbase::pb;
int main(int argc, char *argv[]) {
- MetaRegionServer mrs;
google::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
- FB_LOG_EVERY_MS(INFO, 10000) << "Hello";
- for (long i = 0; i < 10000000; i++) {
- FB_LOG_EVERY_MS(INFO, 1) << Random::rand32();
- }
+ // Create a connection factory
+ ConnectionFactory cf;
+
+ LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
+
+ auto result = cache.LocateMeta().get();
+ cout << "ServerName = " << result.host_name() << ":" << result.port() << endl;
+
+ // Create a connection to the local host
+ auto conn = cf.make_connection(result.host_name(), result.port()).get();
+
+ // Send the request
+ Request r;
+ conn(r).get();
+
return 0;
}
diff --git a/hbase-native-client/core/simple-native-client-test.cc b/hbase-native-client/core/simple-native-client-test.cc
index ef564f7..ee39986 100644
--- a/hbase-native-client/core/simple-native-client-test.cc
+++ b/hbase-native-client/core/simple-native-client-test.cc
@@ -22,7 +22,4 @@
/**
* Sample test.
*/
-TEST(SampleTest, sample) {
- EXPECT_TRUE(true);
-}
-
+TEST(SampleTest, sample) { EXPECT_TRUE(true); }
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/table-name.cc
similarity index 90%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/table-name.cc
index 35a3bd8..ffaaed0 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/table-name.cc
@@ -16,11 +16,4 @@
* limitations under the License.
*
*/
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-#include "if/Cell.pb.h"
-
-class Client {};
+#include "core/table-name.h"
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/table-name.h
similarity index 75%
copy from hbase-native-client/core/client.h
copy to hbase-native-client/core/table-name.h
index 35a3bd8..796115b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/table-name.h
@@ -16,11 +16,17 @@
* limitations under the License.
*
*/
-
#pragma once
-#include <folly/io/IOBuf.h>
+#include <memory>
+#include <string>
-#include "if/Cell.pb.h"
+namespace hbase {
-class Client {};
+// This is the core class of a HBase client.
+class TableName {
+public:
+ explicit TableName(std::string tableName);
+ explicit TableName(std::string namespaceName, std::string tableName);
+};
+} // namespace hbase
diff --git a/hbase-native-client/if/BUCK b/hbase-native-client/if/BUCK
index 9b989b5..5ff617d 100644
--- a/hbase-native-client/if/BUCK
+++ b/hbase-native-client/if/BUCK
@@ -1,3 +1,21 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
PROTO_SRCS = glob(['*.proto'])
HEADER_FILENAMES = [ x.replace('.proto','.pb.h') for x in PROTO_SRCS]
CC_FILENAMES = [ x.replace('.proto', '.pb.cc') for x in PROTO_SRCS]
@@ -20,7 +38,7 @@ for cc_filename in CC_FILENAMES:
genrule(
name = cc_filename,
cmd = 'mkdir -p `dirname $OUT` '
- ' && cp $(location :generate-proto-sources)/{} $OUT '
+ ' && cp $(location :generate-proto-sources)/*.cc `dirname $OUT` '
' && cp $(location :generate-proto-sources)/*.h `dirname $OUT`'.format(cc_filename),
out = cc_filename,
)
@@ -29,9 +47,7 @@ cxx_library(
name = 'if',
exported_headers = [':' + x for x in HEADER_FILENAMES],
srcs = [':' + x for x in CC_FILENAMES],
- deps = [ '//third-party:protobuf']
- + [':' + x for x in CC_FILENAMES]
- + [ ':' + x for x in HEADER_FILENAMES ],
+ deps = [ '//third-party:protobuf'],
visibility = [ 'PUBLIC', ],
exported_deps = ['//third-party:protobuf']
)
diff --git a/hbase-native-client/third-party/BUCK b/hbase-native-client/third-party/BUCK
index 6548695..4327530 100644
--- a/hbase-native-client/third-party/BUCK
+++ b/hbase-native-client/third-party/BUCK
@@ -23,19 +23,30 @@ def add_system_libs(names=[],
exported_linker_flags=[]):
rules = []
for name in names:
+ rule_visibility = ['PUBLIC']
gen_rule_name = "gen_lib{}".format(name)
- genrule(name=gen_rule_name,
- out=gen_rule_name,
- bash="mkdir -p $OUT && cp {}/lib{}.a $OUT".format(lib_dir,
- name), )
+ genrule(
+ name=gen_rule_name,
+ out=gen_rule_name,
+ bash="mkdir -p $OUT && cp {}/lib{}.a $OUT".format(lib_dir, name), )
prebuilt_cxx_library(name=name,
lib_name=name,
lib_dir='$(location :{})'.format(gen_rule_name),
deps=deps,
- force_static = True,
+ force_static=True,
exported_deps=exported_deps,
- visibility=['PUBLIC'],
- exported_linker_flags=exported_linker_flags, )
+ visibility=rule_visibility, )
+ rules.append(":" + name)
+ return rules
+
+
+def add_dynamic_libs(names=[]):
+ rules = []
+ for name in names:
+ prebuilt_cxx_library(name=name,
+ header_only=True,
+ exported_linker_flags=["-l" + name],
+ visibility=["PUBLIC"], )
rules.append(":" + name)
return rules
@@ -54,58 +65,46 @@ local_libs = [
"glog",
"protobuf",
]
+dynamic_libs = ["stdc++", "pthread", "ssl", "crypto", "dl", "atomic", ]
+dynamic_rules = add_dynamic_libs(dynamic_libs)
+tp_dep_rules = add_system_libs(system_libs,) \
+ + add_system_libs(local_libs, lib_dir = "/usr/local/lib") \
+ + dynamic_rules
-
-
-tp_dep_rules = add_system_libs(system_libs) \
- + add_system_libs(local_libs, lib_dir = "/usr/local/lib")
-
-zookeeper = add_system_libs(["zookeeper_mt"], lib_dir = "/usr/local/lib")
+zookeeper = add_system_libs(["zookeeper_mt"], lib_dir="/usr/local/lib")
folly = add_system_libs(['folly'],
lib_dir='/usr/local/lib',
- exported_deps=tp_dep_rules,
- exported_linker_flags=["-pthread",
- "-lstdc++", ])
+ exported_deps=tp_dep_rules, )
folly_bench = add_system_libs(['follybenchmark'],
lib_dir='/usr/local/lib',
- exported_deps=tp_dep_rules + folly,
- exported_linker_flags=["-pthread",
- "-lstdc++", ])
+ exported_deps=folly + tp_dep_rules, )
wangle = add_system_libs(['wangle'],
lib_dir='/usr/local/lib',
- exported_deps=tp_dep_rules + folly,
- exported_linker_flags=["-pthread",
- "-lstdc++", ])
-
+ exported_deps=folly + tp_dep_rules)
genrule(
-name = "gen_zk",
-out = "gen_zk",
-bash = "mkdir -p $OUT && wget http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz && tar zxf zookeeper-3.4.8.tar.gz && rm -rf zookeeper-3.4.8.tar.gz && cd zookeeper-3.4.8 && cd src/c && ./configure --prefix=$OUT && make && make install && cd $OUT && rm -rf zookeeper-3.4.8*"
-)
-cxx_library(
- name = 'google-test',
- srcs = [
- 'googletest/googletest/src/gtest-all.cc',
- 'googletest/googlemock/src/gmock-all.cc',
- 'googletest/googlemock/src/gmock_main.cc',
- ],
- header_namespace = '',
- exported_headers = subdir_glob([
- ('googletest/googletest/include', '**/*.h'),
- ('googletest/googlemock/include', '**/*.h'),
- ]),
- headers = subdir_glob([
- ('googletest/googletest', 'src/*.h'),
- ('googletest/googletest', 'src/*.cc'),
- ('googletest/googlemock', 'src/*.h'),
- ('googletest/googlemock', 'src/*.cc'),
- ]),
- exported_linker_flags = [
- "-pthread",
- "-lstdc++",
- ],
- visibility = [
- 'PUBLIC',
- ],
-)
+ name="gen_zk",
+ out="gen_zk",
+ bash=
+ "mkdir -p $OUT && wget http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz && tar zxf zookeeper-3.4.8.tar.gz && rm -rf zookeeper-3.4.8.tar.gz && cd zookeeper-3.4.8 && cd src/c && ./configure --prefix=$OUT && make && make install && cd $OUT && rm -rf zookeeper-3.4.8*")
+cxx_library(name='google-test',
+ srcs=[
+ 'googletest/googletest/src/gtest-all.cc',
+ 'googletest/googlemock/src/gmock-all.cc',
+ 'googletest/googlemock/src/gmock_main.cc',
+ ],
+ header_namespace='',
+ exported_headers=subdir_glob([
+ ('googletest/googletest/include', '**/*.h'),
+ ('googletest/googlemock/include', '**/*.h'),
+ ]),
+ headers=subdir_glob([
+ ('googletest/googletest', 'src/*.h'),
+ ('googletest/googletest', 'src/*.cc'),
+ ('googletest/googlemock', 'src/*.h'),
+ ('googletest/googlemock', 'src/*.cc'),
+ ]),
+ exported_deps=dynamic_rules,
+ visibility=[
+ 'PUBLIC',
+ ], )