You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/12/16 21:20:53 UTC

[kudu] branch master updated: KUDU-2992: Avoid sending duplicated requests in catalog_manager

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 23e07e3  KUDU-2992: Avoid sending duplicated requests in catalog_manager
23e07e3 is described below

commit 23e07e3a93ad42c1e44cd077acf0ef041b78258a
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Mon Dec 2 10:26:32 2019 +0800

    KUDU-2992: Avoid sending duplicated requests in catalog_manager
    
    When the master sent a large number of rpc requests to a tserver,
    and the tserver didn't response immediately, catalog manager may
    continue to send duplicate requests in ProcessTabletReport(), that
    would cause rpc timeouts and high load on both masters and tservers.
    
    This patch add a tablet_id index to pending_tasks, catalog manager
    would not send duplicate requests to a tablet with the same alter type
    when handling a tablet report from a tablet server. If the existing
    RPC task failed, catalog manager would create it the next time the
    tablet server heartbeats.
    
    This patch also fix HandleResponse() in AsyncDeleteReplica to avoid
    unnecessary retries.
    
    Change-Id: If090723821bd78e14a3c54a35cb5e471320002e9
    Reviewed-on: http://gerrit.cloudera.org:8080/14849
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/master/catalog_manager.cc    | 119 ++++++++++++++++++---------------
 src/kudu/master/catalog_manager.h     |  13 ++--
 src/kudu/master/master-test.cc        | 120 ++++++++++++++++++++++++++++++++++
 src/kudu/tserver/ts_tablet_manager.cc |   2 +-
 4 files changed, 197 insertions(+), 57 deletions(-)

diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 9d2ce01..05ef2a7 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -64,7 +64,6 @@
 #include <boost/function.hpp>
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
 
@@ -3144,6 +3143,9 @@ class RetryingTSRpcTask : public MonitoredTask {
     return static_cast<State>(NoBarrier_Load(&state_));
   }
 
+  // Return the id of the tablet that is the subject of the async request.
+  virtual string tablet_id() const = 0;
+
   MonoTime start_timestamp() const override { return start_ts_; }
   MonoTime completion_timestamp() const override { return end_ts_; }
   const scoped_refptr<TableInfo>& table() const { return table_ ; }
@@ -3162,9 +3164,6 @@ class RetryingTSRpcTask : public MonitoredTask {
   // Runs on the reactor thread, so must not block or perform any IO.
   virtual void HandleResponse(int attempt) = 0;
 
-  // Return the id of the tablet that is the subject of the async request.
-  virtual string tablet_id() const = 0;
-
   // Overridable log prefix with reasonable default.
   virtual string LogPrefix() const {
     return Substitute("$0: ", description());
@@ -3322,13 +3321,7 @@ void RetryingTSRpcTask::RunDelayedTask(const Status& status) {
 
 void RetryingTSRpcTask::UnregisterAsyncTask() {
   end_ts_ = MonoTime::Now();
-  if (table_ != nullptr) {
-    table_->RemoveTask(this);
-  } else {
-    // This is a floating task (since the table does not exist)
-    // created as response to a tablet report.
-    Release();  // May call "delete this";
-  }
+  table_->RemoveTask(tablet_id(), this);
 }
 
 Status RetryingTSRpcTask::ResetTSProxy() {
@@ -3475,7 +3468,7 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
   }
 
   string description() const override {
-    return tablet_id_ + " Delete Tablet RPC for TS=" + permanent_uuid_;
+    return "DeleteTablet RPC for tablet " + tablet_id_ + " on TS " + permanent_uuid_;
   }
 
  protected:
@@ -3494,10 +3487,17 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
               target_ts_desc_->ToString(), tablet_id_, status.ToString());
           MarkComplete();
           break;
+        // Do not retry on a CAS error
         case TabletServerErrorPB::CAS_FAILED:
           LOG(WARNING) << Substitute("TS $0: delete failed for tablet $1 "
               "because of a CAS failure. No further retry: $2",
               target_ts_desc_->ToString(), tablet_id_, status.ToString());
+          MarkFailed();
+          break;
+        case TabletServerErrorPB::ALREADY_INPROGRESS:
+          LOG(WARNING) << Substitute("TS $0: delete failed for tablet $1 "
+            "because tablet deleting was already in progress. No further retry: $2",
+            target_ts_desc_->ToString(), tablet_id_, status.ToString());
           MarkComplete();
           break;
         default:
@@ -3566,7 +3566,8 @@ class AsyncAlterTable : public RetryingTSRpcTask {
   string type_name() const override { return "AlterTable"; }
 
   string description() const override {
-    return tablet_->ToString() + " Alter Table RPC";
+    return Substitute("AlterTable RPC for tablet $0 (table $1, current schema version=$2)",
+                      tablet_->id(), table_->ToString(), table_->schema_version());
   }
 
  private:
@@ -3938,7 +3939,7 @@ Status CatalogManager::ProcessTabletReport(
   unordered_map<string, scoped_refptr<TabletInfo>> tablet_infos;
 
   // Keeps track of all RPCs that should be sent when we're done.
-  vector<unique_ptr<RetryingTSRpcTask>> rpcs;
+  vector<scoped_refptr<RetryingTSRpcTask>> rpcs;
 
   // Locks the referenced tables (for READ) and tablets (for WRITE).
   //
@@ -4017,8 +4018,7 @@ Status CatalogManager::ProcessTabletReport(
         table->metadata().state().is_deleted()) {
       const string& msg = tablet->metadata().state().pb.state_msg();
       update->set_state_msg(msg);
-      LOG(INFO) << Substitute("Got report from deleted tablet $0 ($1): Sending "
-          "delete request for this tablet", tablet->ToString(), msg);
+      VLOG(1) << Substitute("Got report from deleted tablet $0 ($1)", tablet->ToString(), msg);
 
       // TODO(unknown): Cancel tablet creation, instead of deleting, in cases
       // where that might be possible (tablet creation timeout & replacement).
@@ -4300,15 +4300,15 @@ Status CatalogManager::ProcessTabletReport(
 
   // 15. Send all queued RPCs.
   for (auto& rpc : rpcs) {
-    if (rpc->table() != nullptr) {
-      rpc->table()->AddTask(rpc.get());
-    } else {
-      // This is a floating task (since the table does not exist) created in
-      // response to a tablet report.
-      rpc->AddRef();
+    if (rpc->table()->ContainsTask(rpc->tablet_id(), rpc->description())) {
+      // There are some tasks with the same tablet_id, alter type (and permanent_uuid
+      // for some specific tasks) already running, here we just ignore the rpc to avoid
+      // sending duplicate requests, maybe it will be sent the next time the tserver heartbeats.
+      VLOG(1) << Substitute("Not sending duplicate request: $0", rpc->description());
+      continue;
     }
+    rpc->table()->AddTask(rpc->tablet_id(), rpc);
     WARN_NOT_OK(rpc->Run(), Substitute("Failed to send $0", rpc->description()));
-    rpc.release();
   }
 
   return Status::OK();
@@ -4358,9 +4358,9 @@ void CatalogManager::SendAlterTableRequest(const scoped_refptr<TableInfo>& table
   table->GetAllTablets(&tablets);
 
   for (const scoped_refptr<TabletInfo>& tablet : tablets) {
-    auto call = new AsyncAlterTable(master_, tablet);
-    table->AddTask(call);
-    WARN_NOT_OK(call->Run(), "Failed to send alter table request");
+    scoped_refptr<AsyncAlterTable> task = new AsyncAlterTable(master_, tablet);
+    table->AddTask(tablet->id(), task);
+    WARN_NOT_OK(task->Run(), "Failed to send alter table request");
   }
 }
 
@@ -4391,11 +4391,11 @@ void CatalogManager::SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& ta
       << "Sending DeleteTablet for " << cstate.committed_config().peers().size()
       << " replicas of tablet " << tablet->id();
   for (const auto& peer : cstate.committed_config().peers()) {
-    AsyncDeleteReplica* call = new AsyncDeleteReplica(
+    scoped_refptr<AsyncDeleteReplica> task = new AsyncDeleteReplica(
         master_, peer.permanent_uuid(), tablet->table(), tablet->id(),
         TABLET_DATA_DELETED, none, deletion_msg);
-    tablet->table()->AddTask(call);
-    WARN_NOT_OK(call->Run(), Substitute(
+    tablet->table()->AddTask(tablet->id(), task);
+    WARN_NOT_OK(task->Run(), Substitute(
         "Failed to send DeleteReplica request for tablet $0", tablet->id()));
   }
 }
@@ -4777,10 +4777,9 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta
       tablet_lock.data().pb.consensus_state().committed_config();
   tablet->set_last_create_tablet_time(MonoTime::Now());
   for (const RaftPeerPB& peer : config.peers()) {
-    AsyncCreateReplica* task = new AsyncCreateReplica(master_,
-                                                      peer.permanent_uuid(),
-                                                      tablet, tablet_lock);
-    tablet->table()->AddTask(task);
+    scoped_refptr<AsyncCreateReplica> task = new AsyncCreateReplica(
+        master_, peer.permanent_uuid(), tablet, tablet_lock);
+    tablet->table()->AddTask(tablet->id(), task);
     WARN_NOT_OK(task->Run(), "Failed to send new tablet request");
   }
 }
@@ -5419,6 +5418,11 @@ string TableInfo::ToString() const {
   return Substitute("$0 [id=$1]", l.data().pb.name(), table_id_);
 }
 
+uint32_t TableInfo::schema_version() const {
+  TableMetadataLock l(this, LockMode::READ);
+  return l.data().pb.version();
+}
+
 void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablets_to_add,
                                  const vector<scoped_refptr<TabletInfo>>& tablets_to_drop) {
   std::lock_guard<rw_spinlock> l(lock_);
@@ -5507,29 +5511,26 @@ bool TableInfo::IsCreateInProgress() const {
   return false;
 }
 
-void TableInfo::AddTask(MonitoredTask* task) {
-  task->AddRef();
-  {
-    std::lock_guard<rw_spinlock> l(lock_);
-    pending_tasks_.insert(task);
-  }
+void TableInfo::AddTask(const string& tablet_id, const scoped_refptr<MonitoredTask>& task) {
+  std::lock_guard<rw_spinlock> l(lock_);
+  pending_tasks_.emplace(tablet_id, task);
 }
 
-void TableInfo::RemoveTask(MonitoredTask* task) {
-  {
-    std::lock_guard<rw_spinlock> l(lock_);
-    pending_tasks_.erase(task);
+void TableInfo::RemoveTask(const string& tablet_id, MonitoredTask* task) {
+  std::lock_guard<rw_spinlock> l(lock_);
+  auto range = pending_tasks_.equal_range(tablet_id);
+  for (auto it = range.first; it != range.second; ++it) {
+    if (it->second.get() == task) {
+      pending_tasks_.erase(it);
+      break;
+    }
   }
-
-  // Done outside the lock so that if Release() drops the last ref to this
-  // TableInfo, RemoveTask() won't unlock a freed lock.
-  task->Release();
 }
 
 void TableInfo::AbortTasks() {
   shared_lock<rw_spinlock> l(lock_);
-  for (MonitoredTask* task : pending_tasks_) {
-    task->Abort();
+  for (auto& task : pending_tasks_) {
+    task.second->Abort();
   }
 }
 
@@ -5547,10 +5548,24 @@ void TableInfo::WaitTasksCompletion() {
   }
 }
 
-void TableInfo::GetTaskList(vector<scoped_refptr<MonitoredTask>>* ret) {
+bool TableInfo::ContainsTask(const string& tablet_id, const string& task_description) {
   shared_lock<rw_spinlock> l(lock_);
-  for (MonitoredTask* task : pending_tasks_) {
-    ret->push_back(make_scoped_refptr(task));
+  auto range = pending_tasks_.equal_range(tablet_id);
+  for (auto it = range.first; it != range.second; ++it) {
+    if (it->second->description() == task_description) {
+      return true;
+    }
+  }
+  return false;
+}
+
+void TableInfo::GetTaskList(vector<scoped_refptr<MonitoredTask>>* tasks) {
+  tasks->clear();
+  {
+    shared_lock<rw_spinlock> l(lock_);
+    for (const auto& task : pending_tasks_) {
+      tasks->push_back(task.second);
+    }
   }
 }
 
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 242d99b..0363a36 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -263,6 +263,7 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   explicit TableInfo(std::string table_id);
 
   std::string ToString() const;
+  uint32_t schema_version() const;
 
   // Return the table's ID. Does not require synchronization.
   const std::string& id() const { return table_id_; }
@@ -292,11 +293,15 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   // Returns true if an "Alter" operation is in-progress
   bool IsAlterInProgress(uint32_t version) const;
 
-  void AddTask(MonitoredTask *task);
-  void RemoveTask(MonitoredTask *task);
+  void AddTask(const std::string& tablet_id, const scoped_refptr<MonitoredTask>& task);
+  void RemoveTask(const std::string& tablet_id, MonitoredTask* task);
   void AbortTasks();
   void WaitTasksCompletion();
 
+  // Returns true if pending_tasks_ contains a task whose description
+  // is the same as task_description.
+  bool ContainsTask(const std::string& tablet_id, const std::string& task_description);
+
   // Allow for showing outstanding tasks in the master UI.
   void GetTaskList(std::vector<scoped_refptr<MonitoredTask> > *tasks);
 
@@ -365,8 +370,8 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
 
   CowObject<PersistentTableInfo> metadata_;
 
-  // List of pending tasks (e.g. create/alter tablet requests)
-  std::unordered_set<MonitoredTask*> pending_tasks_;
+  // Map of tablet_id to pending tasks (e.g. create/alter tablet requests).
+  std::unordered_multimap<std::string, scoped_refptr<MonitoredTask>> pending_tasks_;
 
   // Map of schema version to the number of tablets that reported that version.
   //
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 9f2fc06..331e502 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -70,6 +70,7 @@
 #include "kudu/security/tls_context.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/security/token_verifier.h"
+#include "kudu/server/monitored_task.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/util/atomic.h"
@@ -1777,6 +1778,125 @@ TEST_F(MasterTest, TestTableIdentifierWithIdAndName) {
   }
 }
 
+TEST_F(MasterTest, TestDuplicateRequest) {
+  const char* const kTsUUID = "my-ts-uuid";
+  TSToMasterCommonPB common;
+  common.mutable_ts_instance()->set_permanent_uuid(kTsUUID);
+  common.mutable_ts_instance()->set_instance_seqno(1);
+
+  // Register the fake TS, without sending any tablet report.
+  ServerRegistrationPB fake_reg;
+  MakeHostPortPB("localhost", 1000, fake_reg.add_rpc_addresses());
+  MakeHostPortPB("localhost", 2000, fake_reg.add_http_addresses());
+  fake_reg.set_software_version(VersionInfo::GetVersionInfo());
+  fake_reg.set_start_time(10000);
+
+  // Information on replica management scheme.
+  ReplicaManagementInfoPB rmi;
+  rmi.set_replacement_scheme(ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION);
+
+  {
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    RpcController rpc;
+    req.mutable_common()->CopyFrom(common);
+    req.mutable_registration()->CopyFrom(fake_reg);
+    req.mutable_replica_management_info()->CopyFrom(rmi);
+    ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_TRUE(resp.leader_master());
+    ASSERT_FALSE(resp.needs_reregister());
+    ASSERT_FALSE(resp.needs_full_tablet_report());
+    ASSERT_FALSE(resp.has_tablet_report());
+  }
+
+  vector<shared_ptr<TSDescriptor> > descs;
+  master_->ts_manager()->GetAllDescriptors(&descs);
+  ASSERT_EQ(1, descs.size()) << "Should have registered the TS";
+  ServerRegistrationPB reg;
+  descs[0]->GetRegistration(&reg);
+  ASSERT_EQ(SecureDebugString(fake_reg), SecureDebugString(reg))
+      << "Master got different registration";
+  shared_ptr<TSDescriptor> ts_desc;
+  ASSERT_TRUE(master_->ts_manager()->LookupTSByUUID(kTsUUID, &ts_desc));
+  ASSERT_EQ(ts_desc, descs[0]);
+
+  // Create a table with three tablets.
+  const char *kTableName = "test_table";
+  const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1);
+  ASSERT_OK(CreateTable(kTableName, kTableSchema));
+
+  vector<scoped_refptr<TableInfo>> tables;
+  {
+    CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+    ASSERT_OK(master_->catalog_manager()->GetAllTables(&tables));
+    ASSERT_EQ(1, tables.size());
+  }
+
+  scoped_refptr<TableInfo> table = tables[0];
+  vector<scoped_refptr<TabletInfo>> tablets;
+  table->GetAllTablets(&tablets);
+  ASSERT_EQ(tablets.size(), 3);
+
+  // Delete the table.
+  {
+    DeleteTableRequestPB req;
+    DeleteTableResponsePB resp;
+    RpcController controller;
+    req.mutable_table()->set_table_name(kTableName);
+    ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_FALSE(resp.has_error());
+  }
+
+  // The table has no pending task.
+  // The master would not send DeleteTablet requests for no consensus state for tablets.
+  vector<scoped_refptr<MonitoredTask>> task_list;
+  tables[0]->GetTaskList(&task_list);
+  ASSERT_EQ(task_list.size(), 0);
+
+  // Now the tserver send a full report with a deleted tablet.
+  // The master will process it and send 'DeleteTablet' request to the tserver.
+  {
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    RpcController rpc;
+    req.mutable_common()->CopyFrom(common);
+    TabletReportPB* tr = req.mutable_tablet_report();
+    tr->set_is_incremental(false);
+    tr->set_sequence_number(0);
+    tr->add_updated_tablets()->set_tablet_id(tablets[0]->id());
+    ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_tablet_report());
+  }
+
+  // The 'DeleteTablet' task is running for the master will continue
+  // retrying to connect to the fake TS.
+  tables[0]->GetTaskList(&task_list);
+  ASSERT_EQ(task_list.size(), 1);
+  ASSERT_EQ(task_list[0]->state(), MonitoredTask::kStateRunning);
+
+  // Now the tserver send a full report with two deleted tablets.
+  // The master will not send duplicate DeleteTablet request to the tserver.
+  {
+    TSHeartbeatRequestPB req;
+    TSHeartbeatResponsePB resp;
+    RpcController rpc;
+    req.mutable_common()->CopyFrom(common);
+    TabletReportPB* tr = req.mutable_tablet_report();
+    tr->set_is_incremental(true);
+    tr->set_sequence_number(0);
+    tr->add_updated_tablets()->set_tablet_id(tablets[0]->id());
+    tr->add_updated_tablets()->set_tablet_id(tablets[1]->id());
+    ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_tablet_report());
+  }
+
+  tables[0]->GetTaskList(&task_list);
+  ASSERT_EQ(task_list.size(), 2);
+}
+
 TEST_F(MasterTest, TestGetTableStatistics) {
   const char *kTableName = "testtable";
   const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1);
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 8695312..b9814f1 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -854,7 +854,7 @@ Status TSTabletManager::BeginReplicaStateTransition(
   Status s = StartTabletStateTransitionUnlocked(tablet_id, reason, deleter);
   if (PREDICT_FALSE(!s.ok())) {
     if (error_code) {
-      *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
+      *error_code = TabletServerErrorPB::ALREADY_INPROGRESS;
     }
     return s;
   }