You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:12 UTC
[hbase] 24/133: HBASE-15777 Fix needs header in client handler
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 987a1ed46109a3ba2ea7fe9b15d4f9d5183a1525
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Thu May 5 13:51:18 2016 -0700
HBASE-15777 Fix needs header in client handler
---
hbase-native-client/connection/client-handler.cc | 31 ++++++++++++++----------
hbase-native-client/connection/client-handler.h | 19 ++++++++++++++-
2 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index b92ad89..4fdb7ae 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -37,7 +37,7 @@ using hbase::pb::GetResponse;
using google::protobuf::Message;
ClientHandler::ClientHandler(std::string user_name)
- : user_name_(user_name), need_send_header_(true), serde_(),
+ : user_name_(user_name), serde_(), header_info_(),
resp_msgs_(
make_unique<folly::AtomicHashMap<
uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {}
@@ -81,22 +81,27 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
}
}
-// TODO(eclark): Figure out how to handle the
-// network errors that are going to come.
Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
// Keep track of if we have sent the header.
- if (UNLIKELY(need_send_header_)) {
- need_send_header_ = false;
+ //
+ // even though the bool is atomic we can load it lazily here.
+ if (UNLIKELY(header_info_->need_.load(std::memory_order_relaxed))) {
- // Should we be sending just one fireWrite?
- // Right now we're sending one for the header
- // and one for the request.
+ // Grab the lock.
+ // We need to make sure that no one gets past here without there being a
+ // hearder sent.
+ std::lock_guard<std::mutex> lock(header_info_->mutex_);
+
+ // Now see if we are the first thread to get through.
//
- // That doesn't seem like too bad, but who knows.
- auto pre = serde_.Preamble();
- auto header = serde_.Header(user_name_);
- pre->appendChain(std::move(header));
- ctx->fireWrite(std::move(pre));
+ // If this is the first thread to get through then the
+ // need_send_header will have been true before this.
+ if (header_info_->need_.exchange(false)) {
+ auto pre = serde_.Preamble();
+ auto header = serde_.Header(user_name_);
+ pre->appendChain(std::move(header));
+ ctx->fireWrite(std::move(pre));
+ }
}
resp_msgs_->insert(r->call_id(), r->resp_msg());
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index be5143c..1a4275f 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -21,6 +21,8 @@
#include <folly/AtomicHashMap.h>
#include <wangle/channel/Handler.h>
+#include <atomic>
+#include <mutex>
#include <string>
#include "serde/rpc.h"
@@ -29,6 +31,7 @@
namespace hbase {
class Request;
class Response;
+class HeaderInfo;
}
namespace google {
namespace protobuf {
@@ -37,6 +40,7 @@ class Message;
}
namespace hbase {
+
class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>,
Response, std::unique_ptr<Request>,
std::unique_ptr<folly::IOBuf>> {
@@ -47,7 +51,7 @@ public:
std::unique_ptr<Request> r) override;
private:
- bool need_send_header_;
+ std::unique_ptr<HeaderInfo> header_info_;
std::string user_name_;
RpcSerde serde_;
@@ -56,4 +60,17 @@ private:
uint32_t, std::shared_ptr<google::protobuf::Message>>>
resp_msgs_;
};
+
+/**
+ * Class to contain the info about if the connection header and preamble has
+ * been sent.
+ *
+ * We use a serperate class here so that ClientHandler is relocatable.
+ */
+class HeaderInfo {
+public:
+ HeaderInfo() : need_(true), mutex_() {}
+ std::atomic<bool> need_;
+ std::mutex mutex_;
+};
} // namespace hbase