You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:12 UTC

[hbase] 24/133: HBASE-15777 Fix needs header in client handler

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 987a1ed46109a3ba2ea7fe9b15d4f9d5183a1525
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Thu May 5 13:51:18 2016 -0700

    HBASE-15777 Fix needs header in client handler
---
 hbase-native-client/connection/client-handler.cc | 31 ++++++++++++++----------
 hbase-native-client/connection/client-handler.h  | 19 ++++++++++++++-
 2 files changed, 36 insertions(+), 14 deletions(-)

diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index b92ad89..4fdb7ae 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -37,7 +37,7 @@ using hbase::pb::GetResponse;
 using google::protobuf::Message;
 
 ClientHandler::ClientHandler(std::string user_name)
-    : user_name_(user_name), need_send_header_(true), serde_(),
+    : user_name_(user_name), serde_(), header_info_(),
       resp_msgs_(
           make_unique<folly::AtomicHashMap<
               uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {}
@@ -81,22 +81,27 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
   }
 }
 
-// TODO(eclark): Figure out how to handle the
-// network errors that are going to come.
 Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
   // Keep track of if we have sent the header.
-  if (UNLIKELY(need_send_header_)) {
-    need_send_header_ = false;
+  //
+  // even though the bool is atomic we can load it lazily here.
+  if (UNLIKELY(header_info_->need_.load(std::memory_order_relaxed))) {
 
-    // Should we be sending just one fireWrite?
-    // Right now we're sending one for the header
-    // and one for the request.
+    // Grab the lock.
+    // We need to make sure that no one gets past here without there being a
+    // hearder sent.
+    std::lock_guard<std::mutex> lock(header_info_->mutex_);
+
+    // Now see if we are the first thread to get through.
     //
-    // That doesn't seem like too bad, but who knows.
-    auto pre = serde_.Preamble();
-    auto header = serde_.Header(user_name_);
-    pre->appendChain(std::move(header));
-    ctx->fireWrite(std::move(pre));
+    // If this is the first thread to get through then the
+    // need_send_header will have been true before this.
+    if (header_info_->need_.exchange(false)) {
+      auto pre = serde_.Preamble();
+      auto header = serde_.Header(user_name_);
+      pre->appendChain(std::move(header));
+      ctx->fireWrite(std::move(pre));
+    }
   }
 
   resp_msgs_->insert(r->call_id(), r->resp_msg());
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index be5143c..1a4275f 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -21,6 +21,8 @@
 #include <folly/AtomicHashMap.h>
 #include <wangle/channel/Handler.h>
 
+#include <atomic>
+#include <mutex>
 #include <string>
 
 #include "serde/rpc.h"
@@ -29,6 +31,7 @@
 namespace hbase {
 class Request;
 class Response;
+class HeaderInfo;
 }
 namespace google {
 namespace protobuf {
@@ -37,6 +40,7 @@ class Message;
 }
 
 namespace hbase {
+
 class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>,
                                              Response, std::unique_ptr<Request>,
                                              std::unique_ptr<folly::IOBuf>> {
@@ -47,7 +51,7 @@ public:
                                    std::unique_ptr<Request> r) override;
 
 private:
-  bool need_send_header_;
+  std::unique_ptr<HeaderInfo> header_info_;
   std::string user_name_;
   RpcSerde serde_;
 
@@ -56,4 +60,17 @@ private:
       uint32_t, std::shared_ptr<google::protobuf::Message>>>
       resp_msgs_;
 };
+
+/**
+ * Class to contain the info about if the connection header and preamble has
+ * been sent.
+ *
+ * We use a serperate class here so that ClientHandler is relocatable.
+ */
+class HeaderInfo {
+public:
+  HeaderInfo() : need_(true), mutex_() {}
+  std::atomic<bool> need_;
+  std::mutex mutex_;
+};
 } // namespace hbase