You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/11/27 06:34:42 UTC

kudu git commit: KUDU-1097 (patch1): test for replica health reporting

Repository: kudu
Updated Branches:
  refs/heads/master 87dcaf34d -> 37b89924e


KUDU-1097 (patch1): test for replica health reporting

Added a test to verify that the leader tablet replica reports on the
replica health changes. The tests verifies that the health reports
are present in the Raft consensus state reported by the leader replica.
The test also verifies that the incremental tablet reports contain
appropriate information once the replica health status changes.

Change-Id: Ie62b49efebad9a123eec51dd302e375e46e0682d
Reviewed-on: http://gerrit.cloudera.org:8080/8642
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/37b89924
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/37b89924
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/37b89924

Branch: refs/heads/master
Commit: 37b89924ee807f39ae30343b84e837daa7a979b3
Parents: 87dcaf3
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Nov 22 22:03:59 2017 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Nov 27 06:33:39 2017 +0000

----------------------------------------------------------------------
 .../ts_tablet_manager-itest.cc                  | 340 ++++++++++++++++---
 1 file changed, 291 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/37b89924/src/kudu/integration-tests/ts_tablet_manager-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 98b0e26..ee5a957 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -16,10 +16,12 @@
 // under the License.
 
 #include <cstdlib>
+#include <map>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -56,37 +58,43 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-DECLARE_bool(enable_leader_failure_detection);
-DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
 DECLARE_bool(allow_unsafe_replication_factor);
+DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
+DECLARE_bool(enable_leader_failure_detection);
+DECLARE_bool(raft_prepare_replacement_before_eviction);
 DEFINE_int32(num_election_test_loops, 3,
              "Number of random EmulateElection() loops to execute in "
              "TestReportNewLeaderOnLeaderChange");
 
-namespace kudu {
-namespace tserver {
-
-using client::KuduClient;
-using client::KuduSchema;
-using client::KuduTable;
-using client::KuduTableCreator;
-using cluster::InternalMiniCluster;
-using cluster::InternalMiniClusterOptions;
-using consensus::GetConsensusRole;
-using consensus::RaftPeerPB;
-using itest::SimpleIntKeyKuduSchema;
-using master::MasterServiceProxy;
-using master::ReportedTabletPB;
-using master::TabletReportPB;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
+using kudu::client::KuduClient;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::GetConsensusRole;
+using kudu::consensus::HealthReportPB;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::SimpleIntKeyKuduSchema;
+using kudu::master::MasterServiceProxy;
+using kudu::master::ReportedTabletPB;
+using kudu::master::TabletReportPB;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
+using kudu::tablet::TabletReplica;
+using kudu::tserver::MiniTabletServer;
+using std::map;
 using std::shared_ptr;
 using std::string;
-using std::vector;
 using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
-using tablet::TabletReplica;
-using tserver::MiniTabletServer;
+
+namespace kudu {
+namespace tserver {
 
 static const char* const kTableName = "test-table";
 
@@ -95,7 +103,7 @@ class TsTabletManagerITest : public KuduTest {
   TsTabletManagerITest()
       : schema_(SimpleIntKeyKuduSchema()) {
   }
-  virtual void SetUp() override {
+  void SetUp() override {
     KuduTest::SetUp();
 
     MessengerBuilder bld("client");
@@ -111,6 +119,21 @@ class TsTabletManagerITest : public KuduTest {
   }
 
  protected:
+  void DisableHeartbeatingToMaster();
+
+  // Populate the 'replicas' container with corresponding objects representing
+  // tablet replicas running at tablet servers in the test cluster. It's assumed
+  // there is at least one tablet replica per tablet server. Also, this utility
+  // method awaits up to the specified timeout for the consensus to be running
+  // before adding an element into the output container.
+  Status PrepareTabletReplicas(MonoDelta timeout,
+                               vector<scoped_refptr<TabletReplica>>* replicas);
+
+  // Generate incremental tablet reports using test-specific method
+  // GenerateIncrementalTabletReportsForTests() of the specified heartbeater.
+  void GetIncrementalTabletReports(Heartbeater* heartbeater,
+                                   vector<TabletReportPB>* reports);
+
   const KuduSchema schema_;
 
   unique_ptr<InternalMiniCluster> cluster_;
@@ -118,6 +141,50 @@ class TsTabletManagerITest : public KuduTest {
   std::shared_ptr<Messenger> client_messenger_;
 };
 
+void TsTabletManagerITest::DisableHeartbeatingToMaster() {
+  for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    MiniTabletServer* ts = cluster_->mini_tablet_server(i);
+    ts->FailHeartbeats();
+  }
+}
+
+Status TsTabletManagerITest::PrepareTabletReplicas(
+    MonoDelta timeout, vector<scoped_refptr<TabletReplica>>* replicas) {
+  const MonoTime deadline = MonoTime::Now() + timeout;
+  for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    MiniTabletServer* ts = cluster_->mini_tablet_server(i);
+    vector<scoped_refptr<TabletReplica>> ts_replicas;
+    // The replicas may not have been created yet, so loop until we see them.
+    while (MonoTime::Now() < deadline) {
+      ts->server()->tablet_manager()->GetTabletReplicas(&ts_replicas);
+      if (!ts_replicas.empty()) {
+        break;
+      }
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    if (ts_replicas.empty()) {
+      return Status::TimedOut("waiting for tablet replicas register with ts manager");
+    }
+    RETURN_NOT_OK(ts_replicas.front()->WaitUntilConsensusRunning(
+        deadline - MonoTime::Now()));
+    replicas->insert(replicas->end(), ts_replicas.begin(), ts_replicas.end());
+  }
+  return Status::OK();
+}
+
+void TsTabletManagerITest::GetIncrementalTabletReports(
+    Heartbeater* heartbeater, vector<TabletReportPB>* reports) {
+  vector<TabletReportPB> r;
+  // The MarkDirty() callback is on an async thread so it might take the
+  // follower a few milliseconds to execute it. Wait for that to happen.
+  ASSERT_EVENTUALLY([&] {
+    r = heartbeater->GenerateIncrementalTabletReportsForTests();
+    ASSERT_EQ(1, r.size());
+    ASSERT_FALSE(r.front().updated_tablets().empty());
+  });
+  reports->swap(r);
+}
+
 // Test that when a tablet is marked as failed, it will eventually be evicted
 // and replaced.
 TEST_F(TsTabletManagerITest, TestFailedTabletsAreReplaced) {
@@ -205,21 +272,12 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
   ValueDeleter deleter(&ts_map);
 
   // Collect the TabletReplicas so we get direct access to RaftConsensus.
-  vector<scoped_refptr<TabletReplica> > tablet_replicas;
-  for (int replica = 0; replica < kNumReplicas; replica++) {
-    MiniTabletServer* ts = cluster_->mini_tablet_server(replica);
-    ts->FailHeartbeats(); // Stop heartbeating we don't race against the Master.
-    vector<scoped_refptr<TabletReplica> > cur_ts_tablet_replicas;
-    // The replicas may not have been created yet, so loop until we see them.
-    while (true) {
-      ts->server()->tablet_manager()->GetTabletReplicas(&cur_ts_tablet_replicas);
-      if (!cur_ts_tablet_replicas.empty()) break;
-      SleepFor(MonoDelta::FromMilliseconds(10));
-    }
-    ASSERT_EQ(1, cur_ts_tablet_replicas.size()); // Each TS should only have 1 tablet.
-    ASSERT_OK(cur_ts_tablet_replicas[0]->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
-    tablet_replicas.push_back(cur_ts_tablet_replicas[0]);
-  }
+  vector<scoped_refptr<TabletReplica>> tablet_replicas;
+  ASSERT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas));
+  ASSERT_EQ(kNumReplicas, tablet_replicas.size());
+
+  // Stop heartbeating we don't race against the Master.
+  DisableHeartbeatingToMaster();
 
   // Loop and cause elections and term changes from different servers.
   // TSTabletManager should acknowledge the role changes via tablet reports.
@@ -227,7 +285,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
     SCOPED_TRACE(Substitute("Iter: $0", i));
     int new_leader_idx = rand() % 2;
     LOG(INFO) << "Electing peer " << new_leader_idx << "...";
-    consensus::RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
+    RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
     ASSERT_OK(con->EmulateElection());
     LOG(INFO) << "Waiting for servers to agree...";
     ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5),
@@ -235,17 +293,10 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
 
     // Now check that the tablet report reports the correct role for both servers.
     for (int replica = 0; replica < kNumReplicas; replica++) {
-      // The MarkDirty() callback is on an async thread so it might take the
-      // follower a few milliseconds to execute it. Wait for that to happen.
-      Heartbeater* heartbeater =
-          cluster_->mini_tablet_server(replica)->server()->heartbeater();
       vector<TabletReportPB> reports;
-      for (int retry = 0; retry <= 12; retry++) {
-        reports = heartbeater->GenerateIncrementalTabletReportsForTests();
-        ASSERT_EQ(1, reports.size());
-        if (!reports[0].updated_tablets().empty()) break;
-        SleepFor(MonoDelta::FromMilliseconds(1 << retry));
-      }
+      NO_FATALS(GetIncrementalTabletReports(
+          cluster_->mini_tablet_server(replica)->server()->heartbeater(),
+          &reports));
 
       // Ensure that our tablet reports are consistent.
       TabletReportPB& report = reports[0];
@@ -267,5 +318,196 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
   }
 }
 
+// Test that the tablet manager generates reports on replica health status
+// in accordance with observed changes on replica status, and that the tablet
+// manager includes that information into the next tablet report. Specifically,
+// verify that:
+//   1. The leader replica provides the health status report in its consensus
+//      state, if requested.
+//   2. The health report information matches the state of tablet replicas.
+//   3. The tablet manager generates appropriate tablet reports with updated
+//      health information when replicas change their state.
+TEST_F(TsTabletManagerITest, ReportOnReplicaHealthStatus) {
+  constexpr int kNumReplicas = 3;
+  const auto kTimeout = MonoDelta::FromSeconds(60);
+
+  FLAGS_raft_prepare_replacement_before_eviction = true;
+  NO_FATALS(StartCluster(kNumReplicas));
+
+  // Create the table.
+  client::sp::shared_ptr<KuduTable> table;
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&schema_)
+            .set_range_partition_columns({})  // need just one tablet
+            .num_replicas(kNumReplicas)
+            .Create());
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+
+  // Build a TServerDetails map so we can check for convergence.
+  const auto& addr = cluster_->mini_master()->bound_rpc_addr();
+  shared_ptr<MasterServiceProxy> master_proxy(
+      new MasterServiceProxy(client_messenger_, addr, addr.host()));
+
+  itest::TabletServerMap ts_map;
+  ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map));
+  ValueDeleter deleter(&ts_map);
+
+  // Collect the TabletReplicas so we get direct access to RaftConsensus.
+  vector<scoped_refptr<TabletReplica>> tablet_replicas;
+  ASSERT_OK(PrepareTabletReplicas(kTimeout, &tablet_replicas));
+  ASSERT_EQ(kNumReplicas, tablet_replicas.size());
+
+  // Don't send heartbeats to master, otherwise it would be a race in
+  // acknowledging the heartbeats and generating new tablet reports. Clearing
+  // the 'dirty' flag on a tablet before the generated tablet report is
+  // introspected makes the test scenario very flaky. Also, this scenario does
+  // not assume that the catalog manager initiates the replacement of failed
+  // voter replicas.
+  DisableHeartbeatingToMaster();
+
+  // Generate health reports for every element of the 'tablet_replicas'
+  // container.  Also, output the leader replica UUID from the consensus
+  // state into 'leader_replica_uuid'.
+  auto get_health_reports = [&](map<string, HealthReportPB>* reports,
+                                string* leader_replica_uuid = nullptr) {
+    ConsensusStatePB cstate;
+    string leader_uuid;
+    for (const auto& replica : tablet_replicas) {
+      RaftConsensus* consensus = CHECK_NOTNULL(replica->consensus());
+      ConsensusStatePB cs(consensus->ConsensusState(RaftConsensus::INCLUDE_HEALTH_REPORT));
+      if (consensus->peer_uuid() == cs.leader_uuid()) {
+        // Only the leader replica has the up-to-date health report.
+        leader_uuid = cs.leader_uuid();
+        cstate.Swap(&cs);
+        break;
+      }
+    }
+    ASSERT_FALSE(leader_uuid.empty());
+    ASSERT_TRUE(cstate.has_committed_config());
+    const RaftConfigPB& config = cstate.committed_config();
+    ASSERT_EQ(kNumReplicas, config.peers_size());
+    if (reports) {
+      reports->clear();
+      for (const auto& peer : config.peers()) {
+        ASSERT_TRUE(peer.has_health_report());
+        reports->emplace(peer.permanent_uuid(), peer.health_report());
+      }
+    }
+    if (leader_replica_uuid) {
+      *leader_replica_uuid = leader_uuid;
+    }
+  };
+
+  // Get the information on committed Raft configuration from tablet reports
+  // generated by the heartbeater of the server running the specified leader
+  // tablet replica.
+  auto get_committed_config_from_reports = [&](const string& leader_replica_uuid,
+                                               RaftConfigPB* config) {
+    TabletServer* leader_server = nullptr;
+    for (auto i = 0; i < kNumReplicas; ++i) {
+      MiniTabletServer* mts = cluster_->mini_tablet_server(i);
+      if (mts->uuid() == leader_replica_uuid) {
+        leader_server = mts->server();
+        break;
+      }
+    }
+    ASSERT_NE(nullptr, leader_server);
+
+    // TSTabletManager should acknowledge the status change via tablet reports.
+    Heartbeater* heartbeater = leader_server->heartbeater();
+    ASSERT_NE(nullptr, heartbeater);
+
+    vector<TabletReportPB> reports;
+    NO_FATALS(GetIncrementalTabletReports(heartbeater, &reports));
+    ASSERT_EQ(1, reports.size());
+    const TabletReportPB& report = reports[0];
+    SCOPED_TRACE("Tablet report: " + pb_util::SecureDebugString(report));
+    ASSERT_EQ(1, report.updated_tablets_size());
+    const ReportedTabletPB& reported_tablet = report.updated_tablets(0);
+    ASSERT_TRUE(reported_tablet.has_consensus_state());
+    const ConsensusStatePB& cstate = reported_tablet.consensus_state();
+    ASSERT_EQ(RaftPeerPB::LEADER, GetConsensusRole(leader_replica_uuid, cstate));
+    ASSERT_TRUE(cstate.has_committed_config());
+    RaftConfigPB cfg = cstate.committed_config();
+    config->Swap(&cfg);
+  };
+
+  // All replicas are up and running, so the leader replica should eventually
+  // report their health status as HEALTHY.
+  {
+    string leader_replica_uuid;
+    ASSERT_EVENTUALLY(([&] {
+      map<string, HealthReportPB> reports;
+      NO_FATALS(get_health_reports(&reports, &leader_replica_uuid));
+      for (const auto& e : reports) {
+        SCOPED_TRACE("replica UUID: " + e.first);
+        ASSERT_EQ(HealthReportPB::HEALTHY, e.second.overall_health());
+      }
+    }));
+
+    // Other replicas are seen by the leader in UNKNOWN health state first.
+    // At this point of the test scenario, since the replicas went from the
+    // UNKNOWN to the HEALTHY state, an incremental tablet reports should
+    // reflect those health status changes.
+    RaftConfigPB config;
+    NO_FATALS(get_committed_config_from_reports(leader_replica_uuid,
+                                                &config));
+    ASSERT_EQ(kNumReplicas, config.peers_size());
+    for (const auto& p : config.peers()) {
+      ASSERT_TRUE(p.has_health_report());
+      const HealthReportPB& report(p.health_report());
+      ASSERT_EQ(HealthReportPB::HEALTHY, report.overall_health());
+    }
+  }
+
+  // Inject an error to the replica and make sure its status is eventually
+  // reported as FAILED.
+  string failed_replica_uuid;
+  {
+    auto replica = tablet_replicas.front();
+    failed_replica_uuid = replica->consensus()->peer_uuid();
+    replica->SetError(Status::IOError("INJECTED ERROR: tablet failed"));
+    replica->Shutdown();
+    ASSERT_EQ(tablet::FAILED, replica->state());
+  }
+
+  ASSERT_EVENTUALLY(([&] {
+    map<string, HealthReportPB> reports;
+    NO_FATALS(get_health_reports(&reports));
+    for (const auto& e : reports) {
+      const auto& replica_uuid = e.first;
+      SCOPED_TRACE("replica UUID: " + replica_uuid);
+      if (replica_uuid == failed_replica_uuid) {
+        ASSERT_EQ(HealthReportPB::FAILED, e.second.overall_health());
+      } else {
+        ASSERT_EQ(HealthReportPB::HEALTHY, e.second.overall_health());
+      }
+    }
+  }));
+
+  // The scenario below assumes the leader replica does not change anymore.
+  FLAGS_enable_leader_failure_detection = false;
+
+  {
+    string leader_replica_uuid;
+    NO_FATALS(get_health_reports(nullptr, &leader_replica_uuid));
+    RaftConfigPB config;
+    NO_FATALS(get_committed_config_from_reports(leader_replica_uuid,
+                                                &config));
+    for (const auto& peer : config.peers()) {
+      ASSERT_TRUE(peer.has_permanent_uuid());
+      const auto& uuid = peer.permanent_uuid();
+      ASSERT_TRUE(peer.has_health_report());
+      const HealthReportPB& report(peer.health_report());
+      if (uuid == failed_replica_uuid) {
+        EXPECT_EQ(HealthReportPB::FAILED, report.overall_health());
+      } else {
+        EXPECT_EQ(HealthReportPB::HEALTHY, report.overall_health());
+      }
+    }
+  }
+}
+
 } // namespace tserver
 } // namespace kudu