You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/12/15 20:14:28 UTC

[kudu] branch master updated: rpc: fix non-retriable error when starting up or shutting down

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c2b003  rpc: fix non-retriable error when starting up or shutting down
7c2b003 is described below

commit 7c2b003b1cf2a812be5d4de8d469ece27b2f3fc9
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Sun Dec 13 23:35:57 2020 -0800

    rpc: fix non-retriable error when starting up or shutting down
    
    I found in testing a later patch that when a server is starting up or
    being destructed, we may run into non-retriable error messages like:
    
    Bad status: Remote error: Failed to write to server: ff6a8ff4d4bd456c99cc60b4ab35b6cc (127.0.0.1:60603): Service unavailable: service kudu.tserver.TabletServerAdminService not registered on TabletServer
    
    This is because during startup or destruction, there are points at which
    RPC services may not be registered, during which time RPCs will run into
    non-retriable errors like that posted.
    
    This patch addresses this by replacing the 'closing_' flag in
    rpc::Messenger with a state enum, allowing us to express windows during
    which the Messenger may not have all services registered.
    
    Change-Id: I04e2379de4cf632d257b93cd701d7c73cb2bbaed
    Reviewed-on: http://gerrit.cloudera.org:8080/16876
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <ha...@cloudera.com>
---
 src/kudu/client/client-test.cc | 44 ++++++++++++++++++++++++++++++++++++++++++
 src/kudu/rpc/messenger.cc      | 24 +++++++++++++++--------
 src/kudu/rpc/messenger.h       | 29 +++++++++++++++++++++++++---
 src/kudu/rpc/rpc-test.cc       |  9 +++++++++
 src/kudu/server/rpc_server.cc  |  1 +
 5 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index ac49571..d1dca6d 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -3145,6 +3145,50 @@ TEST_F(ClientTest, TestWriteTimeout) {
   }
 }
 
+// Test that Write RPCs get properly retried through the duration of a restart.
+// This tests the narrow windows during startup and shutdown that RPC services
+// are unregistered, checking that any resuling  "missing service" errors don't
+// get propogated to end users.
+TEST_F(ClientTest, TestWriteWhileRestarting) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  Status writer_error;
+  int row_id = 1;
+
+  // Writes a row and checks for errors.
+  const auto& write_and_check_error = [&] {
+    RETURN_NOT_OK(ApplyInsertToSession(session.get(), client_table_, row_id++, 1, "row"));
+    RETURN_NOT_OK(session->Flush());
+    // If we successfully flush, check for errors.
+    vector<KuduError*> errors;
+    bool overflow;
+    session->GetPendingErrors(&errors, &overflow);
+    CHECK(!overflow);
+    if (PREDICT_FALSE(!errors.empty())) {
+      return errors[0]->status();
+    }
+    return Status::OK();
+  };
+
+  // Until we finish restarting, hit the tablet server with write requests.
+  CountDownLatch stop(1);
+  thread t([&] {
+    while (writer_error.ok() && stop.count() == 1) {
+      writer_error = write_and_check_error();
+    }
+  });
+  auto thread_joiner = MakeScopedCleanup([&] { t.join(); });
+  auto* ts = cluster_->mini_tablet_server(0);
+  ts->Shutdown();
+  ASSERT_OK(ts->Restart());
+  stop.CountDown();
+  thread_joiner.cancel();
+  t.join();
+
+  // The writer thread should have hit no issues.
+  ASSERT_OK(writer_error);
+}
+
 TEST_F(ClientTest, TestFailedDnsResolution) {
   shared_ptr<KuduSession> session = client_->NewSession();
   ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 784188f..f161faf 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -271,11 +271,11 @@ void Messenger::ShutdownInternal(ShutdownMode mode) {
   RpcServicesMap services_to_release;
   {
     std::lock_guard<percpu_rwlock> guard(lock_);
-    if (closing_) {
+    if (state_ == kClosing) {
       return;
     }
     VLOG(1) << "shutting down messenger " << name_;
-    closing_ = true;
+    state_ = kClosing;
 
     services_to_release = std::move(rpc_services_);
     pools_to_shutdown = std::move(acceptor_pools_);
@@ -330,6 +330,8 @@ Status Messenger::RegisterService(const string& service_name,
                                   const scoped_refptr<RpcService>& service) {
   DCHECK(service);
   std::lock_guard<percpu_rwlock> guard(lock_);
+  DCHECK_NE(kServicesUnregistered, state_);
+  DCHECK_NE(kClosing, state_);
   if (InsertIfNotPresent(&rpc_services_, service_name, service)) {
     return Status::OK();
   } else {
@@ -342,6 +344,7 @@ void Messenger::UnregisterAllServices() {
   {
     std::lock_guard<percpu_rwlock> guard(lock_);
     to_release = std::move(rpc_services_);
+    state_ = kServicesUnregistered;
   }
   // Release the map outside of the lock.
 }
@@ -376,10 +379,15 @@ void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) {
   scoped_refptr<RpcService>* service = FindOrNull(rpc_services_,
                                                   call->remote_method().service_name());
   if (PREDICT_FALSE(!service)) {
-    Status s =  Status::ServiceUnavailable(Substitute("service $0 not registered on $1",
-                                                      call->remote_method().service_name(), name_));
-    LOG(INFO) << s.ToString();
-    call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, s);
+    const auto msg = Substitute("service $0 not registered on $1",
+                                call->remote_method().service_name(), name_);
+    LOG(INFO) << msg;
+    if (state_ == kServicesRegistered) {
+      call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, Status::NotFound(msg));
+    } else {
+      call.release()->RespondFailure(
+          ErrorStatusPB::ERROR_UNAVAILABLE, Status::ServiceUnavailable(msg));
+    }
     return;
   }
 
@@ -401,7 +409,7 @@ void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote
 
 Messenger::Messenger(const MessengerBuilder &bld)
   : name_(bld.name_),
-    closing_(false),
+    state_(kStarted),
     authentication_(RpcAuthentication::REQUIRED),
     encryption_(RpcEncryption::REQUIRED),
     tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, bld.rpc_tls_min_protocol_)),
@@ -427,7 +435,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
 }
 
 Messenger::~Messenger() {
-  CHECK(closing_) << "Should have already shut down";
+  CHECK_EQ(state_, kClosing) << "Should have already shut down";
   STLDeleteElements(&reactors_);
 }
 
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 10b94f8..2d34f30 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -245,7 +245,9 @@ class Messenger {
   // Returns an error if no service with this name can be found.
   Status UnregisterService(const std::string& service_name);
 
-  // Unregisters all RPC services.
+  // Unregisters all RPC services. Once called, no new services can be
+  // registered, and attempts to access missing services will result in a
+  // retriable error code.
   void UnregisterAllServices();
 
   // Queue a call for transmission. This will pick the appropriate reactor,
@@ -303,9 +305,14 @@ class Messenger {
     return name_;
   }
 
+  void SetServicesRegistered() {
+    std::lock_guard<percpu_rwlock> guard(lock_);
+    state_ = kServicesRegistered;
+  }
+
   bool closing() const {
     shared_lock<rw_spinlock> l(lock_.get_lock());
-    return closing_;
+    return state_ == kClosing;
   }
 
   scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_; }
@@ -355,7 +362,23 @@ class Messenger {
   // Protects closing_, acceptor_pools_, rpc_services_.
   mutable percpu_rwlock lock_;
 
-  bool closing_;
+  enum State {
+    // The Messenger has been started; not all services may be registered yet.
+    kStarted,
+
+    // The Messenger is fully up running. All services have been registered and
+    // are accepting requests.
+    // NOTE: Messengers that do not register services never enter this state.
+    kServicesRegistered,
+
+    // All services have been unregistered. No further requests will succeed.
+    // NOTE: Messengers that do not register services never enter this state.
+    kServicesUnregistered,
+
+    // The Messenger is being closed. Its resources may be freed.
+    kClosing,
+  };
+  State state_;
 
   // Whether to require authentication and encryption on the connections managed
   // by this messenger.
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index ce16f0d..593749f 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -410,6 +410,15 @@ TEST_P(TestRpc, TestWrongService) {
   ASSERT_STR_CONTAINS(s.ToString(),
                       "Service unavailable: service WrongServiceName "
                       "not registered on TestServer");
+
+  // If the server has been marked as having registered all services, we should
+  // expect a "not found" error instead.
+  server_messenger_->SetServicesRegistered();
+  s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "Not found: service WrongServiceName "
+                      "not registered on TestServer");
 }
 
 // Test that we can still make RPC connections even if many fds are in use.
diff --git a/src/kudu/server/rpc_server.cc b/src/kudu/server/rpc_server.cc
index 632a35e..337a185 100644
--- a/src/kudu/server/rpc_server.cc
+++ b/src/kudu/server/rpc_server.cc
@@ -219,6 +219,7 @@ Status RpcServer::Start() {
     bound_addrs_str += bind_addr.ToString();
   }
   LOG(INFO) << "RPC server started. Bound to: " << bound_addrs_str;
+  messenger_->SetServicesRegistered();
 
   return Status::OK();
 }