You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:41 UTC
[24/25] hbase git commit: HBASE-18725 [C++] Install header files as
well as library
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/request.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc
deleted file mode 100644
index 8983726..0000000
--- a/hbase-native-client/connection/request.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "connection/request.h"
-
-#include "if/Client.pb.h"
-
-namespace hbase {
-
-Request::Request(std::shared_ptr<google::protobuf::Message> req,
- std::shared_ptr<google::protobuf::Message> resp, std::string method)
- : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {}
-
-std::unique_ptr<Request> Request::get() {
- return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(),
- std::make_shared<hbase::pb::GetResponse>(), "Get");
-}
-std::unique_ptr<Request> Request::mutate() {
- return std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(),
- std::make_shared<hbase::pb::MutateResponse>(), "Mutate");
-}
-std::unique_ptr<Request> Request::scan() {
- return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(),
- std::make_shared<hbase::pb::ScanResponse>(), "Scan");
-}
-std::unique_ptr<Request> Request::multi() {
- return std::make_unique<Request>(std::make_shared<hbase::pb::MultiRequest>(),
- std::make_shared<hbase::pb::MultiResponse>(), "Multi");
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h
deleted file mode 100644
index 4b652c0..0000000
--- a/hbase-native-client/connection/request.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <google/protobuf/message.h>
-
-#include <cstdint>
-#include <memory>
-#include <string>
-
-namespace hbase {
-
-/**
- * Main request class.
- * This holds the request object and the un-filled in approriatley typed
- * response object.
- */
-class Request {
- public:
- /** Create a request object for a get */
- static std::unique_ptr<Request> get();
- /** Create a request object for a mutate */
- static std::unique_ptr<Request> mutate();
- /** Create a request object for a scan */
- static std::unique_ptr<Request> scan();
- /** Create a request object for a multi */
- static std::unique_ptr<Request> multi();
-
- /**
- * This should be private. Do not use this.
- *
- *
- * Constructor that's public for make_unique. This sets all the messages and
- * method name.
- */
- Request(std::shared_ptr<google::protobuf::Message> req,
- std::shared_ptr<google::protobuf::Message> resp, std::string method);
-
- /** Get the call id. */
- uint32_t call_id() { return call_id_; }
- /** Set the call id. This should only be set once. */
- void set_call_id(uint32_t call_id) { call_id_ = call_id; }
- /** Get the backing request protobuf message. */
- std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; }
- /** Get the backing response protobuf message. */
- std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; }
- /** Get the method name. This is used to the the receiving rpc server what
- * method type to decode. */
- std::string method() { return method_; }
-
- std::string DebugString() {
- return "call_id:" + folly::to<std::string>(call_id_) + ", req_msg:" +
- req_msg_->ShortDebugString() + ", method:" + method_;
- }
-
- private:
- uint32_t call_id_;
- std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr;
- std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr;
- std::string method_ = "Get";
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/response.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
deleted file mode 100644
index 38fdda0..0000000
--- a/hbase-native-client/connection/response.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/ExceptionWrapper.h>
-
-#include <cstdint>
-#include <memory>
-#include <string>
-#include <utility>
-
-#include "serde/cell-scanner.h"
-
-// Forward
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace hbase {
-
-/**
- * @brief Class representing a rpc response
- *
- * This is the class sent to a service.
- */
-class Response {
- public:
- /**
- * Constructor.
- * Initinalizes the call id to 0. 0 should never be a valid call id.
- */
- Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_(nullptr) {}
-
- /** Get the call_id */
- uint32_t call_id() { return call_id_; }
-
- /** Set the call_id */
- void set_call_id(uint32_t call_id) { call_id_ = call_id; }
-
- /**
- * Get the response message.
- * The caller is reponsible for knowing the type. In practice the call id is
- * used to figure out the type.
- */
- std::shared_ptr<google::protobuf::Message> resp_msg() const { return resp_msg_; }
-
- /** Set the response message. */
- void set_resp_msg(std::shared_ptr<google::protobuf::Message> response) {
- resp_msg_ = std::move(response);
- }
-
- void set_cell_scanner(std::shared_ptr<CellScanner> cell_scanner) { cell_scanner_ = cell_scanner; }
-
- const std::shared_ptr<CellScanner> cell_scanner() const { return cell_scanner_; }
-
- folly::exception_wrapper exception() { return exception_; }
-
- void set_exception(folly::exception_wrapper value) { exception_ = value; }
-
- std::string DebugString() const {
- std::string s{"call_id:"};
- s += folly::to<std::string>(call_id_);
- s += ", resp_msg:";
- s += resp_msg_->ShortDebugString();
- return s;
- }
-
- private:
- uint32_t call_id_;
- std::shared_ptr<google::protobuf::Message> resp_msg_;
- std::shared_ptr<CellScanner> cell_scanner_;
- folly::exception_wrapper exception_;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
deleted file mode 100644
index 51c9c63..0000000
--- a/hbase-native-client/connection/rpc-client.cc
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "connection/rpc-client.h"
-
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/futures/Future.h>
-#include <unistd.h>
-#include <wangle/concurrent/IOThreadPoolExecutor.h>
-#include "exceptions/exception.h"
-
-using hbase::security::User;
-using std::chrono::nanoseconds;
-
-namespace hbase {
-
-RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
- std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf,
- nanoseconds connect_timeout)
- : io_executor_(io_executor), conf_(conf) {
- cp_ = std::make_shared<ConnectionPool>(io_executor_, cpu_executor, codec, conf, connect_timeout);
-}
-
-void RpcClient::Close() { io_executor_->stop(); }
-
-std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket) {
- return AsyncCall(host, port, std::move(req), ticket).get();
-}
-
-std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket,
- const std::string& service_name) {
- return AsyncCall(host, port, std::move(req), ticket, service_name).get();
-}
-
-folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
- uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket) {
- auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
- return SendRequest(remote_id, std::move(req));
-}
-
-folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
- uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket,
- const std::string& service_name) {
- auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name);
- return SendRequest(remote_id, std::move(req));
-}
-
-/**
- * There are two cases for ConnectionException:
- * 1. The first time connection
- * establishment, i.e. GetConnection(remote_id), AsyncSocketException being a cause.
- * 2. Writing request down the pipeline, i.e. RpcConnection::SendRequest, AsyncSocketException being
- * a cause as well.
- */
-folly::Future<std::unique_ptr<Response>> RpcClient::SendRequest(
- std::shared_ptr<ConnectionId> remote_id, std::unique_ptr<Request> req) {
- try {
- return GetConnection(remote_id)
- ->SendRequest(std::move(req))
- .onError([&, this](const folly::exception_wrapper& ew) {
- VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what());
- ew.with_exception([&, this](const hbase::ConnectionException& re) {
- /* bad connection, remove it from pool. */
- cp_->Close(remote_id);
- });
- return GetFutureWithException(ew);
- });
- } catch (const ConnectionException& e) {
- CHECK(e.cause().get_exception() != nullptr);
- VLOG(3) << folly::sformat("RpcClient Exception: {}", e.cause().what());
- /* bad connection, remove it from pool. */
- cp_->Close(remote_id);
- return GetFutureWithException(e);
- }
-}
-
-template <typename EXCEPTION>
-folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(const EXCEPTION& e) {
- return GetFutureWithException(folly::exception_wrapper{e});
-}
-
-folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(
- const folly::exception_wrapper& ew) {
- folly::Promise<std::unique_ptr<Response>> promise;
- auto future = promise.getFuture();
- promise.setException(ew);
- return future;
-}
-
-std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) {
- return cp_->GetConnection(remote_id);
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
deleted file mode 100644
index 93801d8..0000000
--- a/hbase-native-client/connection/rpc-client.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <google/protobuf/service.h>
-
-#include <folly/ExceptionWrapper.h>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <utility>
-
-#include "connection/connection-id.h"
-#include "connection/connection-pool.h"
-#include "connection/request.h"
-#include "connection/response.h"
-#include "security/user.h"
-
-namespace hbase {
-
-class RpcClient {
- public:
- RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
- std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf,
- std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0));
-
- virtual ~RpcClient() { Close(); }
-
- virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<security::User> ticket);
-
- virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<security::User> ticket,
- const std::string &service_name);
-
- virtual folly::Future<std::unique_ptr<Response>> AsyncCall(
- const std::string &host, uint16_t port, std::unique_ptr<Request> req,
- std::shared_ptr<security::User> ticket);
-
- virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<security::User> ticket,
- const std::string &service_name);
-
- virtual void Close();
-
- std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; }
-
- private:
- std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id);
- folly::Future<std::unique_ptr<Response>> SendRequest(std::shared_ptr<ConnectionId> remote_id,
- std::unique_ptr<Request> req);
- template <typename EXCEPTION>
- folly::Future<std::unique_ptr<Response>> GetFutureWithException(const EXCEPTION &e);
-
- folly::Future<std::unique_ptr<Response>> GetFutureWithException(
- const folly::exception_wrapper &ew);
-
- private:
- std::shared_ptr<ConnectionPool> cp_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
- std::shared_ptr<Configuration> conf_;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-connection.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h
deleted file mode 100644
index 9063280..0000000
--- a/hbase-native-client/connection/rpc-connection.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <memory>
-#include <mutex>
-#include <utility>
-
-#include "connection/connection-factory.h"
-#include "connection/connection-id.h"
-#include "connection/request.h"
-#include "connection/response.h"
-#include "connection/service.h"
-
-namespace hbase {
-
-class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
- public:
- RpcConnection(std::shared_ptr<ConnectionId> connection_id, std::shared_ptr<ConnectionFactory> cf)
- : connection_id_(connection_id), cf_(cf), hbase_service_(nullptr) {}
-
- virtual ~RpcConnection() { Close(); }
-
- virtual std::shared_ptr<ConnectionId> remote_id() const { return connection_id_; }
-
- virtual folly::Future<std::unique_ptr<Response>> SendRequest(std::unique_ptr<Request> req) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- if (hbase_service_ == nullptr) {
- Connect();
- }
- VLOG(5) << "Calling RpcConnection::SendRequest()"; // TODO
- return (*hbase_service_)(std::move(req));
- }
-
- virtual void Close() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- if (hbase_service_) {
- hbase_service_->close();
- hbase_service_ = nullptr;
- }
- if (client_bootstrap_) {
- client_bootstrap_ = nullptr;
- }
- }
-
- private:
- void Connect() {
- client_bootstrap_ = cf_->MakeBootstrap();
- auto dispatcher = cf_->Connect(shared_from_this(), client_bootstrap_, remote_id()->host(),
- remote_id()->port());
- hbase_service_ = std::move(dispatcher);
- }
-
- private:
- std::recursive_mutex mutex_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
- std::shared_ptr<ConnectionId> connection_id_;
- std::shared_ptr<HBaseService> hbase_service_;
- std::shared_ptr<ConnectionFactory> cf_;
- std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client_bootstrap_;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-fault-injector-inl.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-fault-injector-inl.h b/hbase-native-client/connection/rpc-fault-injector-inl.h
deleted file mode 100644
index 8bbaddf..0000000
--- a/hbase-native-client/connection/rpc-fault-injector-inl.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-namespace hbase {
-
-template <typename T>
-std::shared_ptr<T> RpcFaultInjector<T>::instance = std::make_shared<T>();
-
-template <typename T>
-RpcFaultInjector<T>::RpcFaultInjector() {}
-
-template <typename T>
-RpcFaultInjector<T>::~RpcFaultInjector() {}
-
-template <typename T>
-std::shared_ptr<T> RpcFaultInjector<T>::Get() {
- return instance;
-}
-
-template <typename T>
-void RpcFaultInjector<T>::Set(std::shared_ptr<T> injector) {
- instance = injector;
-}
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-fault-injector.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-fault-injector.cc b/hbase-native-client/connection/rpc-fault-injector.cc
deleted file mode 100644
index 16e2034..0000000
--- a/hbase-native-client/connection/rpc-fault-injector.cc
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "rpc-fault-injector.h"
-
-namespace hbase {} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-fault-injector.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-fault-injector.h b/hbase-native-client/connection/rpc-fault-injector.h
deleted file mode 100644
index 2733b7d..0000000
--- a/hbase-native-client/connection/rpc-fault-injector.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/io/async/AsyncTransport.h>
-#include "connection/pipeline.h"
-
-namespace hbase {
-
-template <typename T>
-class RpcFaultInjector {
- public:
- RpcFaultInjector();
- virtual ~RpcFaultInjector();
-
- static std::shared_ptr<T> Get();
- static void Set(std::shared_ptr<T> instance);
-
- private:
- static std::shared_ptr<T> instance;
-};
-
-class RpcClientFaultInjector : public RpcFaultInjector<RpcClientFaultInjector> {
- public:
- RpcClientFaultInjector() {}
- virtual ~RpcClientFaultInjector() {}
- /**
- * Here goes virtual functions for injecting various faults. They should be no-ops by default.
- * Sub classes of RpcClientFaultInjector will override by providing concrete faults.
- */
-};
-} /* namespace hbase */
-
-#include "connection/rpc-fault-injector-inl.h"
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
deleted file mode 100644
index 8e405ef..0000000
--- a/hbase-native-client/connection/rpc-test-server-handler.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "connection/rpc-test-server-handler.h"
-#include "if/RPC.pb.h"
-#include "if/test.pb.h"
-
-namespace hbase {
-
-void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) {
- buf->coalesce();
- pb::RequestHeader header;
-
- int used_bytes = serde_.ParseDelimited(buf.get(), &header);
- VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id();
-
- auto received = CreateReceivedRequest(header.method_name());
-
- buf->trimStart(used_bytes);
- if (header.has_request_param() && received != nullptr) {
- used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get());
- VLOG(3) << "Read RPCRequest, buf length:" << buf->length()
- << ", header PB length:" << used_bytes;
- received->set_call_id(header.call_id());
- }
-
- if (received != nullptr) {
- ctx->fireRead(std::move(received));
- }
-}
-
-folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
- std::unique_ptr<Response> resp) {
- VLOG(3) << "Writing RPC Request";
- // Send the data down the pipeline.
- return ctx->fireWrite(
- serde_.Response(resp->call_id(), resp->resp_msg().get(), resp->exception()));
-}
-
-std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
- const std::string& method_name) {
- std::unique_ptr<Request> result = nullptr;
-
- if (method_name == "ping") {
- result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method_name);
- } else if (method_name == "echo") {
- result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
- std::make_shared<EchoResponseProto>(), method_name);
- } else if (method_name == "error") {
- result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method_name);
- } else if (method_name == "pause") {
- result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method_name);
- } else if (method_name == "addr") {
- result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<AddrResponseProto>(), method_name);
- } else if (method_name == "socketNotOpen") {
- result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method_name);
- }
- return result;
-}
-} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
deleted file mode 100644
index ab0264f..0000000
--- a/hbase-native-client/connection/rpc-test-server-handler.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <wangle/channel/Handler.h>
-
-#include "connection/request.h"
-#include "connection/response.h"
-#include "serde/rpc-serde.h"
-
-using namespace hbase;
-
-namespace hbase {
-// A real rpc server would probably use generated client/server stubs
-class RpcTestServerSerializeHandler
- : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>,
- std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>> {
- public:
- RpcTestServerSerializeHandler() : serde_() {}
-
- void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
-
- folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> resp) override;
-
- private:
- std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);
-
- private:
- hbase::RpcSerde serde_;
-};
-} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
deleted file mode 100644
index 157ea71..0000000
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <wangle/channel/AsyncSocketHandler.h>
-#include <wangle/channel/EventBaseHandler.h>
-#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
-#include <wangle/codec/LengthFieldPrepender.h>
-#include <wangle/service/ServerDispatcher.h>
-
-#include "connection/rpc-test-server-handler.h"
-#include "connection/rpc-test-server.h"
-#include "if/test.pb.h"
-
-namespace hbase {
-
-RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
- std::shared_ptr<AsyncTransportWrapper> sock) {
- if (service_ == nullptr) {
- initService(sock);
- }
- CHECK(service_ != nullptr);
-
- auto pipeline = RpcTestServerSerializePipeline::create();
- pipeline->addBack(AsyncSocketHandler(sock));
- // ensure we can write from any thread
- pipeline->addBack(EventBaseHandler());
- pipeline->addBack(LengthFieldBasedFrameDecoder());
- pipeline->addBack(RpcTestServerSerializeHandler());
- pipeline->addBack(MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(
- service_.get()));
- pipeline->finalize();
-
- return pipeline;
-}
-
-void RpcTestServerPipelineFactory::initService(std::shared_ptr<AsyncTransportWrapper> sock) {
- /* get server address */
- SocketAddress localAddress;
- sock->getLocalAddress(&localAddress);
-
- /* init service with server address */
- service_ = std::make_shared<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>>(
- std::make_shared<CPUThreadPoolExecutor>(1),
- std::make_shared<RpcTestService>(std::make_shared<SocketAddress>(localAddress)));
-}
-
-Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) {
- /* build Response */
- auto response = std::make_unique<Response>();
- response->set_call_id(request->call_id());
- std::string method_name = request->method();
-
- if (method_name == "ping") {
- auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
- response->set_resp_msg(pb_resp_msg);
- VLOG(1) << "RPC server:"
- << " ping called.";
-
- } else if (method_name == "echo") {
- auto pb_resp_msg = std::make_shared<EchoResponseProto>();
- /* get msg from client */
- auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
- pb_resp_msg->set_message(pb_req_msg->message());
- response->set_resp_msg(pb_resp_msg);
- VLOG(1) << "RPC server:"
- << " echo called, " << pb_req_msg->message();
-
- } else if (method_name == "error") {
- auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
- response->set_resp_msg(pb_resp_msg);
- VLOG(1) << "RPC server:"
- << " error called.";
- response->set_exception(RpcTestException("server error!"));
-
- } else if (method_name == "pause") {
- auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
- /* sleeping */
- auto pb_req_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg());
- std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms()));
- response->set_resp_msg(pb_resp_msg);
- VLOG(1) << "RPC server:"
- << " pause called, " << pb_req_msg->ms() << " ms";
-
- } else if (method_name == "addr") {
- // TODO:
- } else if (method_name == "socketNotOpen") {
- auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
- response->set_resp_msg(pb_resp_msg);
- }
-
- return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
deleted file mode 100644
index 955560e..0000000
--- a/hbase-native-client/connection/rpc-test-server.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-#include <folly/SocketAddress.h>
-#include <wangle/concurrent/CPUThreadPoolExecutor.h>
-#include <wangle/service/ExecutorFilter.h>
-#include <wangle/service/Service.h>
-
-#include "connection/request.h"
-#include "connection/response.h"
-#include "exceptions/exception.h"
-
-using namespace hbase;
-using namespace folly;
-using namespace wangle;
-
-namespace hbase {
-using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
-
-class RpcTestException : public IOException {
- public:
- RpcTestException() {}
- RpcTestException(const std::string& what) : IOException(what) {}
- RpcTestException(const std::string& what, const folly::exception_wrapper& cause)
- : IOException(what, cause) {}
- RpcTestException(const folly::exception_wrapper& cause) : IOException("", cause) {}
-};
-
-class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> {
- public:
- RpcTestService(std::shared_ptr<folly::SocketAddress> socket_address)
- : socket_address_(socket_address) {}
- virtual ~RpcTestService() = default;
- Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override;
-
- private:
- std::shared_ptr<folly::SocketAddress> socket_address_;
-};
-
-class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> {
- public:
- RpcTestServerSerializePipeline::Ptr newPipeline(
- std::shared_ptr<AsyncTransportWrapper> sock) override;
-
- private:
- void initService(std::shared_ptr<AsyncTransportWrapper> sock);
-
- private:
- std::shared_ptr<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>> service_{
- nullptr};
-};
-} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
deleted file mode 100644
index 8624e72..0000000
--- a/hbase-native-client/connection/rpc-test.cc
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <wangle/bootstrap/ClientBootstrap.h>
-#include <wangle/channel/Handler.h>
-
-#include <folly/Format.h>
-#include <folly/Logging.h>
-#include <folly/SocketAddress.h>
-#include <folly/String.h>
-#include <folly/experimental/TestUtil.h>
-#include <folly/io/async/AsyncSocketException.h>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-#include <boost/thread.hpp>
-#include <chrono>
-
-#include "connection/rpc-client.h"
-#include "exceptions/exception.h"
-#include "if/test.pb.h"
-#include "rpc-test-server.h"
-#include "security/user.h"
-#include "serde/rpc-serde.h"
-
-using namespace wangle;
-using namespace folly;
-using namespace hbase;
-using namespace std::chrono;
-
-DEFINE_int32(port, 0, "test server port");
-DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result");
-DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.",
- "output format of enforcing fail with exception");
-DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.",
- "output format of enforcing fail without exception");
-typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
-typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
-
-class RpcTest : public ::testing::Test {
- public:
- static void SetUpTestCase() { google::InstallFailureSignalHandler(); }
-};
-
-std::shared_ptr<Configuration> CreateConf() {
- auto conf = std::make_shared<Configuration>();
- conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
- return conf;
-}
-
-ServerPtr CreateRpcServer() {
- /* create rpc test server */
- auto server = std::make_shared<ServerTestBootstrap>();
- server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
- server->bind(FLAGS_port);
- return server;
-}
-
-std::shared_ptr<folly::SocketAddress> GetRpcServerAddress(ServerPtr server) {
- auto addr = std::make_shared<folly::SocketAddress>();
- server->getSockets()[0]->getAddress(addr.get());
- return addr;
-}
-
-std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf) {
- auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
- auto cpu_executor = std::make_shared<wangle::CPUThreadPoolExecutor>(1);
- auto client = std::make_shared<RpcClient>(io_executor, cpu_executor, nullptr, conf);
- return client;
-}
-
-std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf,
- std::chrono::nanoseconds connect_timeout) {
- auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
- auto cpu_executor = std::make_shared<wangle::CPUThreadPoolExecutor>(1);
- auto client =
- std::make_shared<RpcClient>(io_executor, cpu_executor, nullptr, conf, connect_timeout);
- return client;
-}
-
-/**
-* test ping
-*/
-TEST_F(RpcTest, Ping) {
- auto conf = CreateConf();
- auto server = CreateRpcServer();
- auto server_addr = GetRpcServerAddress(server);
- auto client = CreateRpcClient(conf);
-
- auto method = "ping";
- auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method);
-
- /* sending out request */
- client
- ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
- hbase::security::User::defaultUser())
- .then([&](std::unique_ptr<Response> response) {
- auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg());
- EXPECT_TRUE(pb_resp != nullptr);
- VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
- })
- .onError([&](const folly::exception_wrapper& ew) {
- FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
- })
- .get();
-
- server->stop();
- server->join();
-}
-
-/**
- * test echo
- */
-TEST_F(RpcTest, Echo) {
- auto conf = CreateConf();
- auto server = CreateRpcServer();
- auto server_addr = GetRpcServerAddress(server);
- auto client = CreateRpcClient(conf);
-
- auto method = "echo";
- auto greetings = "hello, hbase server!";
- auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
- std::make_shared<EchoResponseProto>(), method);
- auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
- pb_msg->set_message(greetings);
-
- /* sending out request */
- client
- ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
- hbase::security::User::defaultUser())
- .then([&](std::unique_ptr<Response> response) {
- auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
- EXPECT_TRUE(pb_resp != nullptr);
- VLOG(1) << folly::sformat(FLAGS_result_format, method, pb_resp->message());
- EXPECT_EQ(greetings, pb_resp->message());
- })
- .onError([&](const folly::exception_wrapper& ew) {
- FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
- })
- .get();
-
- server->stop();
- server->join();
-}
-
-/**
- * test error
- */
-TEST_F(RpcTest, Error) {
- auto conf = CreateConf();
- auto server = CreateRpcServer();
- auto server_addr = GetRpcServerAddress(server);
- auto client = CreateRpcClient(conf);
-
- auto method = "error";
- auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method);
- /* sending out request */
- client
- ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
- hbase::security::User::defaultUser())
- .then([&](std::unique_ptr<Response> response) {
- FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
- })
- .onError([&](const folly::exception_wrapper& ew) {
- VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
- std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString();
- std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString();
-
- /* verify exception_wrapper */
- EXPECT_TRUE(bool(ew));
- EXPECT_EQ(kRemoteException, ew.class_name());
-
- /* verify exception */
- EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& e) {
- EXPECT_EQ(kRpcTestException, e.exception_class_name());
- EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace());
- }));
- })
- .get();
-
- server->stop();
- server->join();
-}
-
-TEST_F(RpcTest, SocketNotOpen) {
- auto conf = CreateConf();
- auto server = CreateRpcServer();
- auto server_addr = GetRpcServerAddress(server);
- auto client = CreateRpcClient(conf);
-
- auto method = "socketNotOpen";
- auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method);
-
- server->stop();
- server->join();
-
- /* sending out request */
- client
- ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
- hbase::security::User::defaultUser())
- .then([&](std::unique_ptr<Response> response) {
- FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
- })
- .onError([&](const folly::exception_wrapper& ew) {
- VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
- std::string kConnectionException =
- demangle(typeid(hbase::ConnectionException)).toStdString();
- std::string kAsyncSocketException =
- demangle(typeid(folly::AsyncSocketException)).toStdString();
-
- /* verify exception_wrapper */
- EXPECT_TRUE(bool(ew));
- EXPECT_EQ(kConnectionException, ew.class_name());
-
- /* verify exception */
- EXPECT_TRUE(ew.with_exception([&](const hbase::ConnectionException& e) {
- EXPECT_TRUE(bool(e.cause()));
- EXPECT_EQ(kAsyncSocketException, e.cause().class_name());
- VLOG(1) << folly::sformat(FLAGS_result_format, method, e.cause().what());
- e.cause().with_exception([&](const folly::AsyncSocketException& ase) {
- EXPECT_EQ(AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, ase.getType());
- EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno());
- });
- }));
- })
- .get();
-}
-
-/**
- * test pause
- */
-TEST_F(RpcTest, Pause) {
- int ms = 500;
-
- auto conf = CreateConf();
- auto server = CreateRpcServer();
- auto server_addr = GetRpcServerAddress(server);
- auto client =
- CreateRpcClient(conf, std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms)));
-
- auto method = "pause";
- auto request = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
- std::make_shared<EmptyResponseProto>(), method);
- auto pb_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg());
-
- pb_msg->set_ms(ms);
-
- /* sending out request */
- client
- ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
- hbase::security::User::defaultUser())
- .then([&](std::unique_ptr<Response> response) {
- auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg());
- EXPECT_TRUE(pb_resp != nullptr);
- VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
- })
- .onError([&](const folly::exception_wrapper& ew) {
- VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
- FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
- })
- .get();
-
- server->stop();
- server->join();
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/sasl-handler.cc b/hbase-native-client/connection/sasl-handler.cc
deleted file mode 100644
index 9afe1e2..0000000
--- a/hbase-native-client/connection/sasl-handler.cc
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "connection/sasl-handler.h"
-
-#include <glog/logging.h>
-#include <sasl/sasl.h>
-#include <sasl/saslplug.h>
-#include <sasl/saslutil.h>
-
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-#include <wangle/channel/Handler.h>
-
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <utility>
-
-#include "connection/service.h"
-#include "security/user.h"
-using hbase::security::User;
-
-using std::chrono::nanoseconds;
-using namespace folly;
-using namespace wangle;
-using namespace hbase;
-
-SaslHandler::SaslHandler(std::string user_name, std::shared_ptr<Configuration> conf)
- : user_name_(user_name) {
- host_name_.clear();
- secure_ = User::IsSecurityEnabled(*conf);
- service_name_ = SaslUtil::ParseServiceName(conf, secure_);
- sasl_connection_setup_started_.clear();
- sasl_connection_setup_in_progress_.store(true);
-}
-
-SaslHandler::SaslHandler(const SaslHandler &hdlr) {
- user_name_ = hdlr.user_name_;
- service_name_ = hdlr.service_name_;
- secure_ = hdlr.secure_;
- host_name_ = hdlr.host_name_;
- // copy-constructor sets the flags below to their initial state as opposed to getting them
- // from the object this class is constructed from. That way, this instance is ready to do
- // sasl stuff without issues, right from the SaslInit. Sharing a sasl session is not useful
- // between two handler instances.
- sasl_connection_setup_started_.clear();
- sasl_connection_setup_in_progress_.store(true);
- sconn_ = nullptr;
-}
-
-SaslHandler::~SaslHandler() {
- if (nullptr != sconn_) {
- sasl_dispose(&sconn_);
- }
- sconn_ = nullptr;
-}
-
-void SaslHandler::transportActive(Context *ctx) {
- // assign hostname; needed for the sasl handshake if secure
- folly::SocketAddress address;
- ctx->getTransport()->getPeerAddress(&address);
- host_name_ = address.getHostStr();
-
- // now init the sasl library; this is once per process
- if (secure_) {
- sasl_util_.InitializeSaslLib();
- }
- // write the preamble to kick off the RPC handshake
- VLOG(3) << "Writing RPC connection Preamble to server: " << host_name_;
- auto preamble = RpcSerde::Preamble(secure_);
- ctx->fireWrite(std::move(preamble));
- ctx->fireTransportActive();
-}
-
-void SaslHandler::read(Context *ctx, folly::IOBufQueue &buf) {
- // if security is not on, or in case of security-on, if secure connection setup not in progress,
- // pass it up without touching
- if (!secure_ || !sasl_connection_setup_in_progress_.load()) {
- ctx->fireRead(buf);
- } else {
- // message is for this handler; process it appropriately
- ContinueSaslNegotiation(ctx, &buf);
- }
-}
-
-folly::Future<folly::Unit> SaslHandler::write(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
- // if security is on, and if secure connection setup in progress,
- // this message is for this handler to process and respond
- if (secure_ && sasl_connection_setup_in_progress_.load()) {
- // store IOBuf which is to be sent to server after SASL handshake
- iobuf_.push_back(std::move(buf));
- if (!sasl_connection_setup_started_.test_and_set()) {
- // for the first incoming RPC from the higher layer, trigger sasl initialization
- return SaslInit(ctx);
- } else {
- // for the subsequent incoming RPCs from the higher layer, just return empty future
- folly::Promise<folly::Unit> p_;
- return p_.getFuture();
- }
- }
- // pass the bytes recieved down without touching it
- return ctx->fireWrite(std::move(buf));
-}
-
-folly::Future<folly::Unit> SaslHandler::WriteSaslOutput(Context *ctx, const char *out,
- unsigned int outlen) {
- int buffer_size = outlen + 4;
- auto iob = IOBuf::create(buffer_size);
- iob->append(buffer_size);
- // Create the array output stream.
- google::protobuf::io::ArrayOutputStream aos{iob->writableData(), buffer_size};
- std::unique_ptr<google::protobuf::io::CodedOutputStream> coded_output =
- std::make_unique<google::protobuf::io::CodedOutputStream>(&aos);
- uint32_t total_size = outlen;
- total_size = ntohl(total_size);
- coded_output->WriteRaw(&total_size, 4);
- coded_output->WriteRaw(out, outlen);
- return ctx->fireWrite(std::move(iob));
-}
-
-void SaslHandler::FinishAuth(Context *ctx, folly::IOBufQueue *bufQueue) {
- std::unique_ptr<folly::IOBuf> iob;
- if (!bufQueue->empty()) {
- iob = bufQueue->pop_front();
- throw std::runtime_error("Error in the final step of handshake " +
- std::string(reinterpret_cast<const char *>(iob->data())));
- } else {
- sasl_connection_setup_in_progress_.store(false);
- // write what we buffered
- for (size_t i = 0; i < iobuf_.size(); i++) {
- iob = std::move(iobuf_.at(i));
- ctx->fireWrite(std::move(iob));
- }
- }
-}
-
-folly::Future<folly::Unit> SaslHandler::SaslInit(Context *ctx) {
- int rc;
- const char *mechusing, *mechlist = "GSSAPI";
- const char *out;
- unsigned int outlen;
-
- rc = sasl_client_new(service_name_.c_str(), /* The service we are using*/
- host_name_.c_str(), NULL,
- NULL, /* Local and remote IP address strings
- (NULL disables mechanisms which require this info)*/
- NULL, /*connection-specific callbacks*/
- 0 /*security flags*/, &sconn_);
- if (rc != SASL_OK) {
- LOG(FATAL) << "Cannot create client (" << rc << ") ";
- throw std::runtime_error("Cannot create client");
- }
- int curr_rc;
- do {
- curr_rc = sasl_client_start(sconn_, /* the same context from above */
- mechlist, /* the list of mechanisms from the server */
- NULL, /* filled in if an interaction is needed */
- &out, /* filled in on success */
- &outlen, /* filled in on success */
- &mechusing);
- } while (curr_rc == SASL_INTERACT); /* the mechanism may ask us to fill
- in things many times. result is SASL_CONTINUE on success */
- if (curr_rc != SASL_CONTINUE) {
- throw std::runtime_error("Cannot start client (" + std::to_string(curr_rc) + ")");
- }
- folly::Future<folly::Unit> fut = WriteSaslOutput(ctx, out, outlen);
- return fut;
-}
-
-void SaslHandler::ContinueSaslNegotiation(Context *ctx, folly::IOBufQueue *bufQueue) {
- const char *out;
- unsigned int outlen;
-
- int bytes_sent = 0;
- int bytes_received = 0;
-
- std::unique_ptr<folly::IOBuf> iob = bufQueue->pop_front();
- bytes_received = iob->length();
- if (bytes_received == 0) {
- throw std::runtime_error("Error in sasl handshake");
- }
- folly::io::RWPrivateCursor c(iob.get());
- std::uint32_t status = c.readBE<std::uint32_t>();
- std::uint32_t sz = c.readBE<std::uint32_t>();
-
- if (status != 0 /*Status 0 is success*/) {
- // Assumption here is that the response from server is not more than 8 * 1024
- throw std::runtime_error("Error in sasl handshake " +
- std::string(reinterpret_cast<char *>(c.writableData())));
- }
- out = nullptr;
- outlen = 0;
-
- int curr_rc =
- sasl_client_step(sconn_, /* our context */
- reinterpret_cast<char *>(c.writableData()), /* the data from the server */
- sz, /* its length */
- NULL, /* this should be unallocated and NULL */
- &out, /* filled in on success */
- &outlen); /* filled in on success */
-
- if (curr_rc == SASL_OK || curr_rc == SASL_CONTINUE) {
- WriteSaslOutput(ctx, out, outlen);
- }
- if (curr_rc == SASL_OK) {
- FinishAuth(ctx, bufQueue);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h
deleted file mode 100644
index 81f4e81..0000000
--- a/hbase-native-client/connection/sasl-handler.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <glog/logging.h>
-#include <sasl/sasl.h>
-#include <sasl/saslplug.h>
-#include <sasl/saslutil.h>
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "connection/sasl-util.h"
-#include "connection/service.h"
-#include "security/user.h"
-#include "serde/rpc-serde.h"
-
-namespace hbase {
-
-/**
- * Class to perform SASL handshake with server (currently works with regionserver principals only)
- * It is inserted between EventBaseHandler and LengthFieldBasedFrameDecoder in the pipeline
- * SaslHandler would intercept writes to server by buffering the IOBuf's and start the handshake
- * process
- * (via sasl_client_XX calls provided by Cyrus)
- * After handshake is complete, SaslHandler would send the buffered IOBuf's to server and
- * act as pass-thru from then on
- */
-class SaslHandler
- : public wangle::HandlerAdapter<folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> {
- public:
- explicit SaslHandler(std::string user_name, std::shared_ptr<Configuration> conf);
- SaslHandler(const SaslHandler& hdlr);
- ~SaslHandler();
-
- // from HandlerAdapter
- void read(Context* ctx, folly::IOBufQueue& buf) override;
- folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
- void transportActive(Context* ctx) override;
-
- private:
- // used by Cyrus
- sasl_conn_t* sconn_ = nullptr;
- std::string user_name_;
- std::string service_name_;
- std::string host_name_;
- bool secure_;
- std::atomic_flag sasl_connection_setup_started_;
- std::atomic<bool> sasl_connection_setup_in_progress_{true};
- // vector of folly::IOBuf which buffers client writes before handshake is complete
- std::vector<std::unique_ptr<folly::IOBuf>> iobuf_;
- SaslUtil sasl_util_;
-
- // writes the output returned by sasl_client_XX to server
- folly::Future<folly::Unit> WriteSaslOutput(Context* ctx, const char* out, unsigned int outlen);
- folly::Future<folly::Unit> SaslInit(Context* ctx);
- void FinishAuth(Context* ctx, folly::IOBufQueue* bufQueue);
- void ContinueSaslNegotiation(Context* ctx, folly::IOBufQueue* buf);
- std::string ParseServiceName(std::shared_ptr<Configuration> conf, bool secure);
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-util.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/sasl-util.cc b/hbase-native-client/connection/sasl-util.cc
deleted file mode 100644
index ecaf015..0000000
--- a/hbase-native-client/connection/sasl-util.cc
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "connection/sasl-util.h"
-
-#include <glog/logging.h>
-#include <sasl/sasl.h>
-#include <sasl/saslplug.h>
-#include <sasl/saslutil.h>
-
-#include <string>
-
-int SaslUtil::GetPluginPath(void *context __attribute__((unused)), const char **path) {
- *path = getenv("SASL_PATH");
-
- if (*path == NULL) {
- *path = kDefaultPluginDir;
- }
- return SASL_OK;
-}
-
-void *SaslUtil::MutexNew(void) {
- auto m = new std::mutex();
- return m;
-}
-
-int SaslUtil::MutexLock(void *m) {
- (reinterpret_cast<std::mutex *>(m))->lock();
- return SASL_OK;
-}
-
-int SaslUtil::MutexUnlock(void *m) {
- (reinterpret_cast<std::mutex *>(m))->unlock();
- return SASL_OK;
-}
-
-void SaslUtil::MutexDispose(void *m) {
- std::mutex *mutex = reinterpret_cast<std::mutex *>(m);
- delete mutex;
-}
-
-std::once_flag SaslUtil::library_inited_;
-
-void SaslUtil::InitializeSaslLib() {
- std::call_once(library_inited_, []() {
- sasl_set_mutex(reinterpret_cast<sasl_mutex_alloc_t *>(&SaslUtil::MutexNew),
- reinterpret_cast<sasl_mutex_lock_t *>(&SaslUtil::MutexLock),
- reinterpret_cast<sasl_mutex_unlock_t *>(&SaslUtil::MutexUnlock),
- reinterpret_cast<sasl_mutex_free_t *>(&SaslUtil::MutexDispose));
- static sasl_callback_t callbacks[] = {
- {SASL_CB_GETPATH, (sasl_callback_ft)&SaslUtil::GetPluginPath, NULL},
- {SASL_CB_LIST_END, NULL, NULL}};
- int rc = sasl_client_init(callbacks);
- if (rc != SASL_OK) {
- throw std::runtime_error("Cannot initialize client " + std::to_string(rc));
- }
- });
-}
-
-std::string SaslUtil::ParseServiceName(std::shared_ptr<hbase::Configuration> conf, bool secure) {
- if (!secure) {
- return std::string();
- }
- std::string svrPrincipal = conf->Get(kServerPrincipalConfKey, "");
- // principal is of this form: hbase/23a03935850c@EXAMPLE.COM
- // where 23a03935850c is the host (optional)
- std::size_t pos = svrPrincipal.find("/");
- if (pos == std::string::npos && svrPrincipal.find("@") != std::string::npos) {
- pos = svrPrincipal.find("@");
- }
- if (pos == std::string::npos) {
- throw std::runtime_error("Couldn't retrieve service principal from conf");
- }
- VLOG(1) << "pos " << pos << " " << svrPrincipal;
- std::string service_name = svrPrincipal.substr(0, pos);
- return service_name;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/sasl-util.h b/hbase-native-client/connection/sasl-util.h
deleted file mode 100644
index 4d58d9ee..0000000
--- a/hbase-native-client/connection/sasl-util.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <memory>
-#include <mutex>
-#include <string>
-
-#include "core/configuration.h"
-
-class SaslUtil {
- public:
- void InitializeSaslLib(void);
- static std::string ParseServiceName(std::shared_ptr<hbase::Configuration> conf, bool secure);
-
- private:
- static constexpr const char *kDefaultPluginDir = "/usr/lib/sasl2";
- // for now the sasl handler is hardcoded to work against the regionservers only. In the future, if
- // we
- // need the master rpc to work, we could have a map of service names to principals to use (similar
- // to the Java implementation)
- static constexpr const char *kServerPrincipalConfKey = "hbase.regionserver.kerberos.principal";
-
- static int GetPluginPath(void *context, const char **path);
- static void *MutexNew(void);
- static int MutexLock(void *m);
- static int MutexUnlock(void *m);
- static void MutexDispose(void *m);
- static std::once_flag library_inited_;
-};
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/service.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h
deleted file mode 100644
index 64d4f07..0000000
--- a/hbase-native-client/connection/service.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <wangle/service/Service.h>
-
-#include <memory>
-
-#include "connection/request.h"
-#include "connection/response.h"
-
-namespace hbase {
-using HBaseService = wangle::Service<std::unique_ptr<Request>, std::unique_ptr<Response>>;
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
deleted file mode 100644
index 76c836b..0000000
--- a/hbase-native-client/core/BUCK
+++ /dev/null
@@ -1,348 +0,0 @@
-##
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This is the main library.
-cxx_library(
- name="core",
- exported_headers=[
- "async-client-scanner.h",
- "async-connection.h",
- "async-region-locator.h",
- "async-rpc-retrying-caller-factory.h",
- "async-rpc-retrying-caller.h",
- "async-table-result-scanner.h",
- "client.h",
- "cell.h",
- "hbase-macros.h",
- "filter.h",
- "query.h",
- "keyvalue-codec.h",
- "region-location.h",
- "location-cache.h",
- "connection-configuration.h",
- # TODO: move this out of exported
- # Once meta lookup works
- "meta-utils.h",
- "get.h",
- "increment.h",
- "mutation.h",
- "put.h",
- "delete.h",
- "scan.h",
- "append.h",
- "result.h",
- "result-scanner.h",
- "request-converter.h",
- "response-converter.h",
- "table.h",
- "async-scan-rpc-retrying-caller.h",
- "raw-async-table.h",
- "raw-scan-result-consumer.h",
- "scan-result-cache.h",
- "hbase-rpc-controller.h",
- "time-range.h",
- "zk-util.h",
- "action.h",
- "multi-response.h",
- "region-request.h",
- "region-result.h",
- "row.h",
- "server-request.h",
- "async-batch-rpc-retrying-caller.h",
- ],
- srcs=[
- "async-client-scanner.cc",
- "async-connection.cc",
- "async-rpc-retrying-caller-factory.cc",
- "async-rpc-retrying-caller.cc",
- "async-scan-rpc-retrying-caller.cc",
- "async-table-result-scanner.cc",
- "cell.cc",
- "client.cc",
- "hbase-rpc-controller.cc",
- "keyvalue-codec.cc",
- "location-cache.cc",
- "meta-utils.cc",
- "increment.cc",
- "get.cc",
- "mutation.cc",
- "put.cc",
- "delete.cc",
- "scan.cc",
- "append.cc",
- "scan-result-cache.cc",
- "raw-async-table.cc",
- "result.cc",
- "request-converter.cc",
- "response-converter.cc",
- "table.cc",
- "time-range.cc",
- "zk-util.cc",
- "multi-response.cc",
- "region-result.cc",
- "async-batch-rpc-retrying-caller.cc",
- ],
- deps=[
- "//exceptions:exceptions",
- "//utils:utils",
- "//connection:connection",
- "//core:conf",
- "//if:if",
- "//serde:serde",
- "//third-party:folly",
- "//third-party:wangle",
- "//third-party:zookeeper_mt",
- ],
- compiler_flags=['-Weffc++', '-ggdb'],
- visibility=[
- 'PUBLIC',
- ],)
-cxx_library(
- name="conf",
- exported_headers=[
- "configuration.h",
- "hbase-configuration-loader.h",
- ],
- srcs=[
- "configuration.cc",
- "hbase-configuration-loader.cc",
- ],
- deps=["//utils:utils", "//third-party:folly"],
- compiler_flags=['-Weffc++', '-ggdb'],
- visibility=[
- 'PUBLIC',
- ],)
-cxx_test(
- name="location-cache-test",
- srcs=[
- "location-cache-test.cc",
- ],
- deps=[
- ":core",
- "//test-util:test-util",
- ],
- run_test_separately=True,)
-cxx_test(
- name="location-cache-retry-test",
- srcs=[
- "location-cache-retry-test.cc",
- ],
- deps=[
- ":core",
- "//if:if",
- "//serde:serde",
- "//test-util:test-util",
- ],
- run_test_separately=True,)
-cxx_test(
- name="cell-test",
- srcs=[
- "cell-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="filter-test",
- srcs=[
- "filter-test.cc",
- ],
- deps=[
- ":core",
- "//if:if",
- "//serde:serde",
- "//test-util:test-util",
- ],
- run_test_separately=True,)
-cxx_test(
- name="get-test",
- srcs=[
- "get-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="delete-test",
- srcs=[
- "delete-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="increment-test",
- srcs=[
- "increment-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="put-test",
- srcs=[
- "put-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="append-test",
- srcs=[
- "append-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="retry-test",
- srcs=[
- "async-rpc-retrying-test.cc",
- ],
- deps=[
- ":core",
- "//test-util:test-util",
- "//exceptions:exceptions",
- ],
- run_test_separately=True,)
-cxx_test(
- name="time-range-test",
- srcs=[
- "time-range-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="configuration-test",
- srcs=[
- "configuration-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="hbase-configuration-test",
- srcs=[
- "hbase-configuration-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="scan-test",
- srcs=[
- "scan-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="result-test",
- srcs=[
- "result-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="request-converter-test",
- srcs=[
- "request-converter-test.cc",
- ],
- deps=[
- ":core",
- "//connection:connection",
- "//if:if",
- ],
- run_test_separately=True,)
-cxx_test(
- name="client-test",
- srcs=[
- "client-test.cc",
- ],
- deps=[
- ":core",
- "//if:if",
- "//serde:serde",
- "//test-util:test-util",
- ],
- run_test_separately=True,)
-cxx_test(
- name="scan-result-cache-test",
- srcs=[
- "scan-result-cache-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="scanner-test",
- srcs=[
- "scanner-test.cc",
- ],
- deps=[
- ":core",
- "//if:if",
- "//serde:serde",
- "//test-util:test-util",
- ],
- run_test_separately=True,)
-cxx_test(
- name="zk-util-test",
- srcs=[
- "zk-util-test.cc",
- ],
- deps=[
- ":core",
- ],
- run_test_separately=True,)
-cxx_test(
- name="multi-retry-test",
- srcs=[
- "async-batch-rpc-retrying-test.cc",
- ],
- deps=[
- ":core",
- "//test-util:test-util",
- "//exceptions:exceptions",
- ],
- run_test_separately=True,)
-cxx_binary(
- name="simple-client",
- srcs=[
- "simple-client.cc",
- ],
- deps=[":core", "//connection:connection"],)
-cxx_binary(
- name="load-client",
- srcs=[
- "load-client.cc",
- ],
- deps=[":core", "//connection:connection"],)
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/action.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h
deleted file mode 100644
index a00f079..0000000
--- a/hbase-native-client/core/action.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <memory>
-#include "core/row.h"
-
-namespace hbase {
-class Action {
- public:
- Action(std::shared_ptr<hbase::Row> action, int32_t original_index)
- : action_(action), original_index_(original_index) {}
- ~Action() {}
-
- int32_t original_index() const { return original_index_; }
-
- std::shared_ptr<hbase::Row> action() const { return action_; }
-
- private:
- std::shared_ptr<hbase::Row> action_;
- int32_t original_index_;
- int64_t nonce_ = -1;
- int32_t replica_id_ = -1;
-};
-
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append-test.cc b/hbase-native-client/core/append-test.cc
deleted file mode 100644
index 2216034..0000000
--- a/hbase-native-client/core/append-test.cc
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "core/append.h"
-#include "core/mutation.h"
-#include "utils/time-util.h"
-
-using hbase::Append;
-using hbase::Cell;
-using hbase::CellType;
-using hbase::Mutation;
-using hbase::TimeUtil;
-
-const constexpr int64_t Mutation::kLatestTimestamp;
-
-TEST(Append, Row) {
- Append append{"foo"};
- EXPECT_EQ("foo", append.row());
-}
-
-TEST(Append, Durability) {
- Append append{"row"};
- EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, append.Durability());
-
- auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL;
- append.SetDurability(skipWal);
- EXPECT_EQ(skipWal, append.Durability());
-}
-
-TEST(Append, Timestamp) {
- Append append{"row"};
-
- // test default timestamp
- EXPECT_EQ(Mutation::kLatestTimestamp, append.TimeStamp());
-
- // set custom timestamp
- auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos());
- append.SetTimeStamp(ts);
- EXPECT_EQ(ts, append.TimeStamp());
-
- // Add a column with custom timestamp
- append.Add("f", "q", "v");
- auto &cell = append.FamilyMap().at("f")[0];
- EXPECT_EQ(ts, cell->Timestamp());
-}
-
-TEST(Append, HasFamilies) {
- Append append{"row"};
-
- EXPECT_EQ(false, append.HasFamilies());
-
- append.Add("f", "q", "v");
- EXPECT_EQ(true, append.HasFamilies());
-}
-
-TEST(Append, Add) {
- CellType cell_type = CellType::PUT;
- std::string row = "row";
- std::string family = "family";
- std::string column = "column";
- std::string value = "value";
- int64_t timestamp = std::numeric_limits<int64_t>::max();
- auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
-
- // add first cell
- Append append{"row"};
- append.Add(std::move(cell));
- EXPECT_EQ(1, append.FamilyMap().size());
- EXPECT_EQ(1, append.FamilyMap().at(family).size());
-
- // add a non-matching row
- auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
- Append append2{"foo"};
- ASSERT_THROW(append2.Add(std::move(cell2)), std::runtime_error); // rows don't match
-
- // add a second cell with same family
- auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type);
- append.Add(std::move(cell3));
- EXPECT_EQ(1, append.FamilyMap().size());
- EXPECT_EQ(2, append.FamilyMap().at(family).size());
-
- // add a cell to a different family
- auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type);
- append.Add(std::move(cell4));
- EXPECT_EQ(2, append.FamilyMap().size());
- EXPECT_EQ(1, append.FamilyMap().at("family-2").size());
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append.cc b/hbase-native-client/core/append.cc
deleted file mode 100644
index 95349ae..0000000
--- a/hbase-native-client/core/append.cc
+++ /dev/null
@@ -1,53 +0,0 @@
-
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/append.h"
-#include <folly/Conv.h>
-#include <algorithm>
-#include <limits>
-#include <stdexcept>
-#include <utility>
-
-namespace hbase {
-
-/**
- * @brief Append to the column from the specific family with the specified qualifier
- * @param family family name
- * @param qualifier column qualifier
- * @param value value to append
- */
-Append& Append::Add(const std::string& family, const std::string& qualifier,
- const std::string& value) {
- family_map_[family].push_back(std::move(
- std::make_unique<Cell>(row_, family, qualifier, timestamp_, value, hbase::CellType::PUT)));
- return *this;
-}
-Append& Append::Add(std::unique_ptr<Cell> cell) {
- if (cell->Row() != row_) {
- throw std::runtime_error("The row in " + cell->DebugString() +
- " doesn't match the original one " + row_);
- }
-
- family_map_[cell->Family()].push_back(std::move(cell));
- return *this;
-}
-
-} // namespace hbase