You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/22 22:02:21 UTC

[1/9] incubator-kudu git commit: c++ client: use operation timeout as deadline for finding new leader master

Repository: incubator-kudu
Updated Branches:
  refs/heads/master ea02b1a3b -> 14524f84d


c++ client: use operation timeout as deadline for finding new leader master

We had been using the default RPC timeout, which may be set to a very low
value as in ClientStressTest_MultiMaster_TestLeaderResolutionTimeout.
Now we'll use the operation timeout as the overall deadline while still
preserving the semantics of using the default RPC timeout for the
GetMasterRegistration() RPCs themselves.

As my patch series removes the guarantee that a leader master is elected at
the time that cluster tests run, it's important that the logic for finding
the leader master provide ample time for an election to finish.

Also, I think I've addressed the root cause behind KUDU-573 by fixing a race
in GetLeaderMasterRpc's SendRpcCb() and GetMasterRegistrationRpcCbForNode()
methods. The race manifests when the last two RPC responses are "I am the
leader" and "I am not the leader" respectively. In one interleaving, both
responses enter SendRpcCb(), and the second calls DelayedRetryCb(). If that
were a call to DelayedRetry() instead, the GetLeaderMasterRpc would be
destroyed by the time the reactor thread reran the RPC.

Change-Id: I0d770875bbf4703444abac11dbc232d7e382165e
Reviewed-on: http://gerrit.cloudera.org:8080/3718
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c0b4f508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c0b4f508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c0b4f508

Branch: refs/heads/master
Commit: c0b4f508d234a890f0cee6e3f997fba3d40cb6cb
Parents: ea02b1a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jul 20 16:55:19 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Jul 22 20:20:38 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.cc              | 12 ++---
 .../integration-tests/external_mini_cluster.cc  |  3 +-
 src/kudu/master/master_rpc.cc                   | 55 ++++++++++++++------
 src/kudu/master/master_rpc.h                    | 16 ++++--
 src/kudu/tserver/heartbeater.cc                 | 15 +++---
 5 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c0b4f508/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 4a802bc..bdd8067 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -791,13 +791,6 @@ void KuduClient::Data::SetMasterServerProxyAsync(KuduClient* client,
     master_sockaddrs.push_back(addrs[0]);
   }
 
-  // Finding a new master involves a fan-out RPC to each master. A single
-  // RPC timeout's worth of time should be sufficient, though we'll use
-  // the provided deadline if it's sooner.
-  MonoTime leader_master_deadline = MonoTime::Now(MonoTime::FINE);
-  leader_master_deadline.AddDelta(client->default_rpc_timeout());
-  MonoTime actual_deadline = MonoTime::Earliest(deadline, leader_master_deadline);
-
   // This ensures that no more than one GetLeaderMasterRpc is in
   // flight at a time -- there isn't much sense in requesting this information
   // in parallel, since the requests should end up with the same result.
@@ -810,8 +803,9 @@ void KuduClient::Data::SetMasterServerProxyAsync(KuduClient* client,
     leader_master_rpc_.reset(new GetLeaderMasterRpc(
                                Bind(&KuduClient::Data::LeaderMasterDetermined,
                                     Unretained(this)),
-                               master_sockaddrs,
-                               actual_deadline,
+                               std::move(master_sockaddrs),
+                               deadline,
+                               client->default_rpc_timeout(),
                                messenger_));
     l.unlock();
     leader_master_rpc_->SendRpc();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c0b4f508/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index fc0eb72..03da89c 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -400,8 +400,9 @@ Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
   rpc.reset(new GetLeaderMasterRpc(Bind(&LeaderMasterCallback,
                                         &leader_master_hp,
                                         &sync),
-                                   addrs,
+                                   std::move(addrs),
                                    deadline,
+                                   MonoDelta::FromSeconds(5),
                                    messenger_));
   rpc->SendRpc();
   RETURN_NOT_OK(sync.Wait());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c0b4f508/src/kudu/master/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_rpc.cc b/src/kudu/master/master_rpc.cc
index 7e42cd3..d14d6ea 100644
--- a/src/kudu/master/master_rpc.cc
+++ b/src/kudu/master/master_rpc.cc
@@ -29,6 +29,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
 
 
 using std::shared_ptr;
@@ -106,17 +107,18 @@ void GetMasterRegistrationRpc::SendRpcCb(const Status& status) {
 
 GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb,
                                        vector<Sockaddr> addrs,
-                                       const MonoTime& deadline,
-                                       const shared_ptr<Messenger>& messenger)
-    : Rpc(deadline, messenger),
+                                       MonoTime deadline,
+                                       MonoDelta rpc_timeout,
+                                       shared_ptr<Messenger> messenger)
+    : Rpc(std::move(deadline), std::move(messenger)),
       user_cb_(std::move(user_cb)),
       addrs_(std::move(addrs)),
+      rpc_timeout_(std::move(rpc_timeout)),
       pending_responses_(0),
       completed_(false) {
   DCHECK(deadline.Initialized());
 
-  // Using resize instead of reserve to explicitly initialized the
-  // values.
+  // Using resize instead of reserve to explicitly initialized the values.
   responses_.resize(addrs_.size());
 }
 
@@ -134,13 +136,19 @@ string GetLeaderMasterRpc::ToString() const {
 }
 
 void GetLeaderMasterRpc::SendRpc() {
+  // Compute the actual deadline to use for each RPC.
+  MonoTime rpc_deadline = MonoTime::Now(MonoTime::FINE);
+  rpc_deadline.AddDelta(rpc_timeout_);
+  MonoTime actual_deadline = MonoTime::Earliest(retrier().deadline(),
+                                                rpc_deadline);
+
   std::lock_guard<simple_spinlock> l(lock_);
   for (int i = 0; i < addrs_.size(); i++) {
     GetMasterRegistrationRpc* rpc = new GetMasterRegistrationRpc(
         Bind(&GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode,
              this, ConstRef(addrs_[i]), ConstRef(responses_[i])),
         addrs_[i],
-        retrier().deadline(),
+        actual_deadline,
         retrier().messenger(),
         &responses_[i]);
     rpc->SendRpc();
@@ -149,23 +157,35 @@ void GetLeaderMasterRpc::SendRpc() {
 }
 
 void GetLeaderMasterRpc::SendRpcCb(const Status& status) {
+  // To safely retry, we must reset completed_ so that it can be reused in the
+  // next round of RPCs.
+  //
+  // The SendRpcCb invariant (see GetMasterRegistrationRpcCbForNode comments)
+  // implies that if we're to retry, we must be the last response. Thus, it is
+  // safe to reset completed_ in this case; there's no danger of a late
+  // response reading it and entering SendRpcCb inadvertently.
+  auto undo_completed = MakeScopedCleanup([&]() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    completed_ = false;
+  });
+
   // If we've received replies from all of the nodes without finding
   // the leader, or if there were network errors talking to all of the
   // nodes the error is retriable and we can perform a delayed retry.
   if (status.IsNetworkError() || status.IsNotFound()) {
-    // TODO (KUDU-573): Allow cancelling delayed tasks on reactor so
-    // that we can safely use DelayedRetry here.
-    mutable_retrier()->DelayedRetryCb(this, Status::OK());
+    mutable_retrier()->DelayedRetry(this, status);
     return;
   }
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    // 'completed_' prevents 'user_cb_' from being invoked twice.
-    if (completed_) {
-      return;
-    }
-    completed_ = true;
+
+  // If our replies timed out but the deadline hasn't passed, retry.
+  if (status.IsTimedOut() &&
+      MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
+    mutable_retrier()->DelayedRetry(this, status);
+    return;
   }
+
+  // We are not retrying.
+  undo_completed.cancel();
   user_cb_.Run(status, leader_master_);
 }
 
@@ -199,6 +219,7 @@ void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(const Sockaddr& node_
       } else {
         // We've found a leader.
         leader_master_ = HostPort(node_addr);
+        completed_ = true;
       }
     }
     --pending_responses_;
@@ -209,6 +230,8 @@ void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(const Sockaddr& node_
         // a delayed re-try, which don't need to do unless we've
         // been unable to find a leader so far.
         return;
+      } else {
+        completed_ = true;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c0b4f508/src/kudu/master/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_rpc.h b/src/kudu/master/master_rpc.h
index d7ffc5d..cb300b0 100644
--- a/src/kudu/master/master_rpc.h
+++ b/src/kudu/master/master_rpc.h
@@ -94,11 +94,14 @@ class GetLeaderMasterRpc : public rpc::Rpc,
   // 'leader_master', which must remain valid for the lifetime of this
   // object.
   //
-  // Calls 'user_cb' when the leader is found, or if no leader can be
-  // found until 'deadline' passes.
-  GetLeaderMasterRpc(LeaderCallback user_cb, std::vector<Sockaddr> addrs,
-                     const MonoTime& deadline,
-                     const std::shared_ptr<rpc::Messenger>& messenger);
+  // Calls 'user_cb' when the leader is found, or if no leader can be found
+  // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete
+  // before it times out and may be retried if 'deadline' has not yet passed.
+  GetLeaderMasterRpc(LeaderCallback user_cb,
+                     std::vector<Sockaddr> addrs,
+                     MonoTime deadline,
+                     MonoDelta rpc_timeout,
+                     std::shared_ptr<rpc::Messenger> messenger);
 
   virtual void SendRpc() OVERRIDE;
 
@@ -124,6 +127,9 @@ class GetLeaderMasterRpc : public rpc::Rpc,
 
   HostPort leader_master_;
 
+  // The amount of time alloted to each GetMasterRegistration RPC.
+  MonoDelta rpc_timeout_;
+
   // The received responses.
   //
   // See also: GetMasterRegistrationRpc above.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c0b4f508/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index b506f95..f36d59f 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -222,13 +222,14 @@ Status Heartbeater::Thread::FindLeaderMaster(const MonoTime& deadline,
     return Status::NotFound("unable to resolve any of the master addresses!");
   }
   Synchronizer sync;
-  scoped_refptr<GetLeaderMasterRpc> rpc(new GetLeaderMasterRpc(
-                                          Bind(&LeaderMasterCallback,
-                                               leader_hostport,
-                                               &sync),
-                                          master_sock_addrs,
-                                          deadline,
-                                          server_->messenger()));
+  scoped_refptr<GetLeaderMasterRpc> rpc(
+      new GetLeaderMasterRpc(Bind(&LeaderMasterCallback,
+                                  leader_hostport,
+                                  &sync),
+                             std::move(master_sock_addrs),
+                             deadline,
+                             MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms),
+                             server_->messenger()));
   rpc->SendRpc();
   return sync.Wait();
 }


[9/9] incubator-kudu git commit: ksck: fix a test flake caused by a race on timeout

Posted by jd...@apache.org.
ksck: fix a test flake caused by a race on timeout

I saw a case where TestKsckTimeout could fail with the following
race:
- we would time out the WaitFor() which waits for all results to be received,
  thus setting 'timed_out' to true.
- immediately thereafter, the last result would be received
- after printing the results, we'd see that the number of results matched the
  number of tablets and not return the 'TimedOut' value.

This race is more likely now that we multi-thread the fetching of checksums.

The fix here is to trust the 'timed_out' boolean that comes from 'WaitFor'
rather than looking at the tablet count.

Change-Id: I1cbc68df96a9d721c7d850bb664bb4f5af2495d0
Reviewed-on: http://gerrit.cloudera.org:8080/3727
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/14524f84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/14524f84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/14524f84

Branch: refs/heads/master
Commit: 14524f84db49ea44d5f51b873d2b875a2a2dbd75
Parents: e8ddee8
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Jul 22 12:30:37 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:59:54 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck.cc | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/14524f84/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 3a21711..28e4522 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -463,10 +463,9 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
     }
   }
 
-  bool timed_out = false;
-  if (!reporter->WaitFor(options.timeout)) {
-    timed_out = true;
-  }
+  bool timed_out = !reporter->WaitFor(options.timeout);
+
+  // Even if we timed out, print the checksum results that we did get.
   ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();
 
   int num_errors = 0;
@@ -513,14 +512,16 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
     }
     if (printed_table_name) cout << endl;
   }
-  if (num_results != num_tablet_replicas) {
-    CHECK(timed_out) << Substitute("Unexpected error: only got $0 out of $1 replica results",
-                                   num_results, num_tablet_replicas);
+  if (timed_out) {
     return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: "
                                        "Received results for $1 out of $2 expected replicas",
                                        options.timeout.ToString(), num_results,
                                        num_tablet_replicas));
   }
+  CHECK_EQ(num_results, num_tablet_replicas)
+      << Substitute("Unexpected error: only got $0 out of $1 replica results",
+                    num_results, num_tablet_replicas);
+
   if (num_mismatches != 0) {
     return Status::Corruption(Substitute("$0 checksum mismatches were detected", num_mismatches));
   }


[5/9] incubator-kudu git commit: ksck: also print info about the good replicas for a bad tablet

Posted by jd...@apache.org.
ksck: also print info about the good replicas for a bad tablet

In the case that one or more replicas is bad in a tablet, it
can be useful to know where the _good_ replicas are in order to
start debugging why they aren't doing their job of replacing or
repairing the bad one. This patch prints out an INFO message for
those good replicas.

Example output:
  WARNING: Detected problems with Tablet 3bf432551c5d4c529616f8e7ce829424 of table 'usertable'
  ------------------------------------------------------------
  WARNING: Bad state on TS c189483a372947f192fc338e612ef70f (e1516.halxg.cloudera.com:7050): NOT_STARTED
    Last status: RemoteBootstrap: Downloading block 369288934977914585 (450/31161)
    Data state:  TABLET_DATA_COPYING
  INFO: OK state on TS dff78a5acdbb4a47ba2c7a62d1bcc5ee (e1407.halxg.cloudera.com:7050): RUNNING
  INFO: OK state on TS ea948da3787648c2b26049e953afb3ea (e1301.halxg.cloudera.com:7050): RUNNING

Change-Id: Ic0dcefe5a7b00c77a116cc40601cce13cf8a0112
Reviewed-on: http://gerrit.cloudera.org:8080/3706
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2e04bf54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2e04bf54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2e04bf54

Branch: refs/heads/master
Commit: 2e04bf54c562d1d75f5ad50760affc0aeb5ed7c2
Parents: 9cc2e04
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 20 14:53:15 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:28:45 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck-test.cc | 12 +++++++++---
 src/kudu/tools/ksck.cc      |  9 ++++++++-
 2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2e04bf54/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 44fcf69..70c4dd0 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -290,17 +290,23 @@ TEST_F(KsckTest, TestBadTabletServer) {
       err_stream_.str(),
       "WARNING: Detected problems with Tablet tablet-id-0 of table 'test'\n"
       "------------------------------------------------------------\n"
-      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n\n");
+      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n"
+      "INFO: OK state on TS ts-id-0 (<mock>): RUNNING\n"
+      "INFO: OK state on TS ts-id-2 (<mock>): RUNNING\n");
   ASSERT_STR_CONTAINS(
       err_stream_.str(),
       "WARNING: Detected problems with Tablet tablet-id-1 of table 'test'\n"
       "------------------------------------------------------------\n"
-      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n\n");
+      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n"
+      "INFO: OK state on TS ts-id-0 (<mock>): RUNNING\n"
+      "INFO: OK state on TS ts-id-2 (<mock>): RUNNING\n");
   ASSERT_STR_CONTAINS(
       err_stream_.str(),
       "WARNING: Detected problems with Tablet tablet-id-2 of table 'test'\n"
       "------------------------------------------------------------\n"
-      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n\n");
+      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n"
+      "INFO: OK state on TS ts-id-0 (<mock>): RUNNING\n"
+      "INFO: OK state on TS ts-id-2 (<mock>): RUNNING\n");
 }
 
 TEST_F(KsckTest, TestZeroTabletReplicasCheck) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2e04bf54/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 67f9ff2..c46095b 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -500,7 +500,7 @@ bool Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int table_num_repl
   string tablet_str = Substitute("Tablet $0 of table '$1'",
                                  tablet->id(), tablet->table()->name());
   vector<shared_ptr<KsckTabletReplica> > replicas = tablet->replicas();
-  vector<string> warnings, errors;
+  vector<string> warnings, errors, infos;
   if (check_replica_count_ && replicas.size() != table_num_replicas) {
     warnings.push_back(Substitute("$0 has $1 instead of $2 replicas",
                                   tablet_str, replicas.size(), table_num_replicas));
@@ -528,6 +528,8 @@ bool Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int table_num_repl
           VLOG(1) << Substitute("Tablet replica for $0 on TS $1 is RUNNING",
                                 tablet_str, ts->ToString());
           running_count++;
+          infos.push_back(Substitute("OK state on TS $0: $1",
+                                     ts->ToString(), tablet::TabletStatePB_Name(state)));
           break;
 
         case tablet::UNKNOWN:
@@ -584,6 +586,11 @@ bool Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int table_num_repl
     for (const auto& s : errors) {
       Error() << s << endl;
     }
+    // We only print the 'INFO' messages on tablets that have some issues.
+    // Otherwise, it's a bit verbose.
+    for (const auto& s : infos) {
+      Info() << s << endl;
+    }
     Out() << endl;
   }
 


[6/9] incubator-kudu git commit: ksck: fix a crash in checksum mode on tables with many tablets

Posted by jd...@apache.org.
ksck: fix a crash in checksum mode on tables with many tablets

In the case that the list of tablets had to be fetched in multiple
batches, we would improperly re-fetch the last tablet of the previous
batch as the first tablet of the next batch. This would then cause
a tablet to be inserted twice into the list, which would later cause
a CHECK failure when we tried to InsertOrDie() this tablet ID into
a map.

This fixes the issue by making sure that we look for more tablets starting with
the *successor* partition key compared to the previous tablet we fetched.
I also updated the integration test to use a table with more tablets
so that the batching code was exercised.

Change-Id: I4ca7ef75bd22ce27885e31ab20cf0e8e0ee2d355
Reviewed-on: http://gerrit.cloudera.org:8080/3714
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/bcf1adc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/bcf1adc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/bcf1adc1

Branch: refs/heads/master
Commit: bcf1adc1b88fe28bf89b4b8c1a4daac96c7f0242
Parents: 2e04bf5
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 20 15:40:34 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:34:58 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck_remote-test.cc | 11 ++++++++---
 src/kudu/tools/ksck_remote.cc      | 16 +++++++++-------
 2 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bcf1adc1/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index c10fabb..f8d7a83 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -28,6 +28,7 @@
 #include "kudu/util/test_util.h"
 
 DECLARE_int32(heartbeat_interval_ms);
+DECLARE_int32(tablets_batch_size_max);
 
 namespace kudu {
 namespace tools {
@@ -62,6 +63,10 @@ class RemoteKsckTest : public KuduTest {
     // Speed up testing, saves about 700ms per TEST_F.
     FLAGS_heartbeat_interval_ms = 10;
 
+    // Fetch the tablets in smaller batches to regression test a bug
+    // previously seen in the batching code.
+    FLAGS_tablets_batch_size_max = 5;
+
     MiniClusterOptions opts;
     opts.num_tablet_servers = 3;
     mini_cluster_.reset(new MiniCluster(env_.get(), opts));
@@ -137,10 +142,10 @@ class RemoteKsckTest : public KuduTest {
   // Generate a set of split rows for tablets used in this test.
   vector<const KuduPartialRow*> GenerateSplitRows() {
     vector<const KuduPartialRow*> split_rows;
-    vector<int> split_nums = { 33, 66 };
-    for (int i : split_nums) {
+    int num_tablets = AllowSlowTests() ? 10 : 3;
+    for (int i = 1; i < num_tablets; i++) {
       KuduPartialRow* row = schema_.NewRow();
-      CHECK_OK(row->SetInt32(0, i));
+      CHECK_OK(row->SetInt32(0, i * 10));
       split_rows.push_back(row);
     }
     return split_rows;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bcf1adc1/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 66a495a..3ba95ee 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -21,12 +21,13 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 
 DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks");
 DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
-DEFINE_int64(tablets_batch_size_max, 100, "How many tablets to get from the Master per RPC");
+DEFINE_int32(tablets_batch_size_max, 100, "How many tablets to get from the Master per RPC");
 
 namespace kudu {
 namespace tools {
@@ -307,10 +308,10 @@ Status RemoteKsckMaster::RetrieveTablesList(vector<shared_ptr<KsckTable>>* table
 Status RemoteKsckMaster::RetrieveTabletsList(const shared_ptr<KsckTable>& table) {
   vector<shared_ptr<KsckTablet>> tablets;
   bool more_tablets = true;
-  string last_key;
+  string next_key;
   int retries = 0;
   while (more_tablets) {
-    Status s = GetTabletsBatch(table, &last_key, tablets, &more_tablets);
+    Status s = GetTabletsBatch(table, &next_key, tablets, &more_tablets);
     if (s.IsServiceUnavailable() && retries++ < 25) {
       SleepFor(MonoDelta::FromMilliseconds(100 * retries));
     } else if (!s.ok()) {
@@ -323,7 +324,7 @@ Status RemoteKsckMaster::RetrieveTabletsList(const shared_ptr<KsckTable>& table)
 }
 
 Status RemoteKsckMaster::GetTabletsBatch(const shared_ptr<KsckTable>& table,
-                                         string* last_partition_key,
+                                         string* next_partition_key,
                                          vector<shared_ptr<KsckTablet>>& tablets,
                                          bool* more_tablets) {
   master::GetTableLocationsRequestPB req;
@@ -332,16 +333,17 @@ Status RemoteKsckMaster::GetTabletsBatch(const shared_ptr<KsckTable>& table,
 
   req.mutable_table()->set_table_name(table->name());
   req.set_max_returned_locations(FLAGS_tablets_batch_size_max);
-  req.set_partition_key_start(*last_partition_key);
+  req.set_partition_key_start(*next_partition_key);
 
   rpc.set_timeout(GetDefaultTimeout());
   RETURN_NOT_OK(proxy_->GetTableLocations(req, &resp, &rpc));
   for (const master::TabletLocationsPB& locations : resp.tablet_locations()) {
-    if (locations.partition().partition_key_start() < *last_partition_key) {
+    if (locations.partition().partition_key_start() < *next_partition_key) {
       // We've already seen this partition.
       continue;
     }
-    *last_partition_key = locations.partition().partition_key_start();
+
+    *next_partition_key = ImmediateSuccessor(locations.partition().partition_key_start());
 
     shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), locations.tablet_id()));
     vector<shared_ptr<KsckTabletReplica>> replicas;


[8/9] incubator-kudu git commit: ksck: improve filtering capability

Posted by jd...@apache.org.
ksck: improve filtering capability

- filters can now use glob-like pattern syntax
- filters now apply for the metadata checks, not just the checksums

Change-Id: Ic6ef8ab20679a9967c321cd4f8412ea4ea5fd50d
Reviewed-on: http://gerrit.cloudera.org:8080/3716
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/e8ddee80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/e8ddee80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/e8ddee80

Branch: refs/heads/master
Commit: e8ddee8047b33e3dd1d97f25ef8773242f64212d
Parents: f9928e8
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 20 19:06:23 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:43:08 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/cluster_verifier.cc |  5 +-
 src/kudu/tools/ksck-test.cc                    | 30 +++++++-
 src/kudu/tools/ksck.cc                         | 85 ++++++++++++---------
 src/kudu/tools/ksck.h                          | 29 +++++--
 src/kudu/tools/ksck_remote-test.cc             | 14 +---
 src/kudu/tools/kudu-ksck.cc                    |  7 +-
 6 files changed, 110 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e8ddee80/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index 64d406a..f156073 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -94,10 +94,7 @@ Status ClusterVerifier::DoKsck() {
   RETURN_NOT_OK(ksck->FetchTableAndTabletInfo());
   RETURN_NOT_OK(ksck->FetchInfoFromTabletServers());
   RETURN_NOT_OK(ksck->CheckTablesConsistency());
-
-  vector<string> tables;
-  vector<string> tablets;
-  RETURN_NOT_OK(ksck->ChecksumData(tables, tablets, checksum_options_));
+  RETURN_NOT_OK(ksck->ChecksumData(checksum_options_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e8ddee80/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index acc1ea4..f542c14 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -321,7 +321,7 @@ TEST_F(KsckTest, TestZeroTableCheck) {
 TEST_F(KsckTest, TestOneTableCheck) {
   CreateOneTableOneTablet();
   ASSERT_OK(RunKsck());
-  ASSERT_OK(ksck_->ChecksumData({}, {}, ChecksumOptions()));
+  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions()));
   ASSERT_STR_CONTAINS(err_stream_.str(),
                       "0/1 replicas remaining (20B from disk, 10 rows summed)");
 }
@@ -329,9 +329,35 @@ TEST_F(KsckTest, TestOneTableCheck) {
 TEST_F(KsckTest, TestOneSmallReplicatedTable) {
   CreateOneSmallReplicatedTable();
   ASSERT_OK(RunKsck());
-  ASSERT_OK(ksck_->ChecksumData({}, {}, ChecksumOptions()));
+  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions()));
   ASSERT_STR_CONTAINS(err_stream_.str(),
                       "0/9 replicas remaining (180B from disk, 90 rows summed)");
+
+  // Test filtering (a non-matching pattern)
+  err_stream_.str("");
+  ksck_->set_table_filters({"xyz"});
+  ASSERT_OK(RunKsck());
+  Status s = ksck_->ChecksumData(ChecksumOptions());
+  EXPECT_EQ("Not found: No tablet replicas found. Filter: table_filters=xyz", s.ToString());
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "INFO: The cluster doesn't have any matching tables");
+
+  // Test filtering with a matching table pattern.
+  err_stream_.str("");
+  ksck_->set_table_filters({"te*"});
+  ASSERT_OK(RunKsck());
+  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions()));
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "0/9 replicas remaining (180B from disk, 90 rows summed)");
+
+  // Test filtering with a matching tablet ID pattern.
+  err_stream_.str("");
+  ksck_->set_table_filters({});
+  ksck_->set_tablet_id_filters({"*-id-2"});
+  ASSERT_OK(RunKsck());
+  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions()));
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "0/3 replicas remaining (60B from disk, 30 rows summed)");
 }
 
 TEST_F(KsckTest, TestOneOneTabletBrokenTable) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e8ddee80/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 06eb3cd..3a21711 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -21,7 +21,6 @@
 #include <glog/logging.h>
 #include <iostream>
 #include <mutex>
-#include <unordered_set>
 
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/map-util.h"
@@ -29,6 +28,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/locks.h"
@@ -82,6 +82,20 @@ static ostream& Error() {
   return Out() << "ERROR: ";
 }
 
+namespace {
+// Return true if 'str' matches any of the patterns in 'patterns', or if
+// 'patterns' is empty.
+bool MatchesAnyPattern(const vector<string>& patterns, const string& str) {
+  // Consider no filter a wildcard.
+  if (patterns.empty()) return true;
+
+  for (const auto& p : patterns) {
+    if (MatchPattern(str, p)) return true;
+  }
+  return false;
+}
+} // anonymous namespace
+
 ChecksumOptions::ChecksumOptions()
     : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
       scan_concurrency(FLAGS_checksum_scan_concurrency),
@@ -196,28 +210,30 @@ Status Ksck::ConnectToTabletServer(const shared_ptr<KsckTabletServer>& ts) {
 }
 
 Status Ksck::CheckTablesConsistency() {
-  VLOG(1) << "Getting the tables list";
-  int tables_count = cluster_->tables().size();
-  VLOG(1) << Substitute("List of $0 table(s) retrieved", tables_count);
-
-  if (tables_count == 0) {
-    Info() << "The cluster doesn't have any tables" << endl;
-    return Status::OK();
-  }
-
-  VLOG(1) << "Verifying each table";
+  int tables_checked = 0;
   int bad_tables_count = 0;
   for (const shared_ptr<KsckTable> &table : cluster_->tables()) {
+    if (!MatchesAnyPattern(table_filters_, table->name())) {
+      VLOG(1) << "Skipping table " << table->name();
+      continue;
+    }
+    tables_checked++;
     if (!VerifyTable(table)) {
       bad_tables_count++;
     }
   }
+
+  if (tables_checked == 0) {
+    Info() << "The cluster doesn't have any matching tables" << endl;
+    return Status::OK();
+  }
+
   if (bad_tables_count == 0) {
-    Info() << Substitute("The metadata for $0 table(s) is HEALTHY", tables_count) << endl;
+    Info() << Substitute("The metadata for $0 table(s) is HEALTHY", tables_checked) << endl;
     return Status::OK();
   } else {
     Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state",
-                         bad_tables_count, tables_count) << endl;
+                         bad_tables_count, tables_checked) << endl;
     return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count));
   }
 }
@@ -359,12 +375,7 @@ class TabletServerChecksumCallbacks : public ChecksumProgressCallbacks {
   std::string tablet_id_;
 };
 
-Status Ksck::ChecksumData(const vector<string>& tables,
-                          const vector<string>& tablets,
-                          const ChecksumOptions& opts) {
-  const unordered_set<string> tables_filter(tables.begin(), tables.end());
-  const unordered_set<string> tablets_filter(tablets.begin(), tablets.end());
-
+Status Ksck::ChecksumData(const ChecksumOptions& opts) {
   // Copy options so that local modifications can be made and passed on.
   ChecksumOptions options = opts;
 
@@ -374,23 +385,23 @@ Status Ksck::ChecksumData(const vector<string>& tables,
   int num_tablet_replicas = 0;
   for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
     VLOG(1) << "Table: " << table->name();
-    if (!tables_filter.empty() && !ContainsKey(tables_filter, table->name())) continue;
+    if (!MatchesAnyPattern(table_filters_, table->name())) continue;
     for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
       VLOG(1) << "Tablet: " << tablet->id();
-      if (!tablets_filter.empty() && !ContainsKey(tablets_filter, tablet->id())) continue;
+      if (!MatchesAnyPattern(tablet_id_filters_, tablet->id())) continue;
       InsertOrDie(&tablet_table_map, tablet, table);
       num_tablet_replicas += tablet->replicas().size();
     }
   }
   if (num_tablet_replicas == 0) {
     string msg = "No tablet replicas found.";
-    if (!tables.empty() || !tablets.empty()) {
+    if (!table_filters_.empty() || !tablet_id_filters_.empty()) {
       msg += " Filter: ";
-      if (!tables.empty()) {
-        msg += "tables=" + JoinStrings(tables, ",") + ".";
+      if (!table_filters_.empty()) {
+        msg += "table_filters=" + JoinStrings(table_filters_, ",");
       }
-      if (!tablets.empty()) {
-        msg += "tablets=" + JoinStrings(tablets, ",") + ".";
+      if (!tablet_id_filters_.empty()) {
+        msg += "tablet_id_filters=" + JoinStrings(tablet_id_filters_, ",");
       }
     }
     return Status::NotFound(msg);
@@ -522,24 +533,30 @@ Status Ksck::ChecksumData(const vector<string>& tables,
 
 bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
   bool good_table = true;
-  vector<shared_ptr<KsckTablet> > tablets = table->tablets();
-  int tablets_count = tablets.size();
-  if (tablets_count == 0) {
-    Warn() << Substitute("Table $0 has 0 tablets", table->name()) << endl;
-    return false;
+  const auto all_tablets = table->tablets();
+  vector<shared_ptr<KsckTablet>> tablets;
+  std::copy_if(all_tablets.begin(), all_tablets.end(), std::back_inserter(tablets),
+                 [&](const shared_ptr<KsckTablet>& t) {
+                   return MatchesAnyPattern(tablet_id_filters_, t->id());
+                 });
+
+  if (tablets.empty()) {
+    Info() << Substitute("Table $0 has 0 matching tablets", table->name()) << endl;
+    return true;
   }
   int table_num_replicas = table->num_replicas();
   VLOG(1) << Substitute("Verifying $0 tablets for table $1 configured with num_replicas = $2",
-                        tablets_count, table->name(), table_num_replicas);
+                        tablets.size(), table->name(), table_num_replicas);
+
   int bad_tablets_count = 0;
-  // TODO check if the tablets are contiguous and in order.
   for (const shared_ptr<KsckTablet> &tablet : tablets) {
     if (!VerifyTablet(tablet, table_num_replicas)) {
       bad_tablets_count++;
     }
   }
   if (bad_tablets_count == 0) {
-    Info() << Substitute("Table $0 is HEALTHY", table->name()) << endl;
+    Info() << Substitute("Table $0 is HEALTHY ($1 tablets checked)",
+                         table->name(), tablets.size()) << endl;
   } else {
     Warn() << Substitute("Table $0 has $1 bad tablets", table->name(), bad_tablets_count) << endl;
     good_table = false;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e8ddee80/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 2976944..cb78686 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -332,6 +332,24 @@ class Ksck {
     check_replica_count_ = check;
   }
 
+  // Setters for filtering the tables/tablets to be checked.
+  //
+  // Filter strings are glob-style patterns. For example, 'Foo*' matches
+  // all tables whose name begins with 'Foo'.
+  //
+  // If tables is not empty, checks only the named tables.
+  // If tablets is not empty, checks only the specified tablet IDs.
+  // If both are specified, takes the intersection.
+  // If both are empty (unset), all tables and tablets are checked.
+  void set_table_filters(vector<string> table_names) {
+    table_filters_ = std::move(table_names);
+  }
+
+  // See above.
+  void set_tablet_id_filters(vector<string> tablet_ids) {
+    tablet_id_filters_ = std::move(tablet_ids);
+  }
+
   // Verifies that it can connect to the master.
   Status CheckMasterRunning();
 
@@ -353,14 +371,8 @@ class Ksck {
   Status CheckTablesConsistency();
 
   // Verifies data checksums on all tablets by doing a scan of the database on each replica.
-  // If tables is not empty, checks only the named tables.
-  // If tablets is not empty, checks only the specified tablets.
-  // If both are specified, takes the intersection.
-  // If both are empty, all tables and tablets are checked.
   // Must first call FetchTableAndTabletInfo().
-  Status ChecksumData(const std::vector<std::string>& tables,
-                      const std::vector<std::string>& tablets,
-                      const ChecksumOptions& options);
+  Status ChecksumData(const ChecksumOptions& options);
 
  private:
   bool VerifyTable(const std::shared_ptr<KsckTable>& table);
@@ -372,6 +384,9 @@ class Ksck {
   const std::shared_ptr<KsckCluster> cluster_;
 
   bool check_replica_count_ = true;
+  vector<string> table_filters_;
+  vector<string> tablet_id_filters_;
+
   DISALLOW_COPY_AND_ASSIGN(Ksck);
 };
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e8ddee80/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index fd22975..1528930 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -235,9 +235,7 @@ TEST_F(RemoteKsckTest, TestChecksum) {
     ASSERT_OK(ksck_->FetchTableAndTabletInfo());
 
     err_stream_.str("");
-    s = ksck_->ChecksumData(vector<string>(),
-                            vector<string>(),
-                            ChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
+    s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
     if (s.ok()) {
       // Check the status message at the end of the checksum.
       // We expect '0B from disk' because we didn't write enough data to trigger a flush
@@ -259,9 +257,7 @@ TEST_F(RemoteKsckTest, TestChecksumTimeout) {
   ASSERT_OK(GenerateRowWrites(num_writes));
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   // Use an impossibly low timeout value of zero!
-  Status s = ksck_->ChecksumData(vector<string>(),
-                                 vector<string>(),
-                                 ChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false, 0));
+  Status s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false, 0));
   ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString();
 }
 
@@ -286,8 +282,7 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshot) {
   // Remove this loop when that is done. See KUDU-1056.
   while (true) {
     ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-    Status s = ksck_->ChecksumData(vector<string>(), vector<string>(),
-                                   ChecksumOptions(MonoDelta::FromSeconds(10), 16, true, ts));
+    Status s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(10), 16, true, ts));
     if (s.ok()) break;
     if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) break;
     SleepFor(MonoDelta::FromMilliseconds(10));
@@ -319,8 +314,7 @@ TEST_F(RemoteKsckTest, DISABLED_TestChecksumSnapshotCurrentTimestamp) {
   CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30)));
 
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  ASSERT_OK(ksck_->ChecksumData(vector<string>(), vector<string>(),
-                                ChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
+  ASSERT_OK(ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
                                                 ChecksumOptions::kCurrentTimestamp)));
   continue_writing.Store(false);
   ASSERT_OK(promise.Get());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e8ddee80/src/kudu/tools/kudu-ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-ksck.cc b/src/kudu/tools/kudu-ksck.cc
index e704f2e..70a6ce1 100644
--- a/src/kudu/tools/kudu-ksck.cc
+++ b/src/kudu/tools/kudu-ksck.cc
@@ -90,6 +90,9 @@ static void RunKsck(vector<string>* error_messages) {
   shared_ptr<KsckCluster> cluster(new KsckCluster(master));
   shared_ptr<Ksck> ksck(new Ksck(cluster));
 
+  ksck->set_table_filters(strings::Split(FLAGS_tables, ",", strings::SkipEmpty()));
+  ksck->set_tablet_id_filters(strings::Split(FLAGS_tablets, ",", strings::SkipEmpty()));
+
   // This is required for everything below.
   PUSH_PREPEND_NOT_OK(ksck->CheckMasterRunning(), error_messages,
                       "Master aliveness check error");
@@ -108,9 +111,7 @@ static void RunKsck(vector<string>* error_messages) {
                       "Table consistency check error");
 
   if (FLAGS_checksum_scan) {
-    vector<string> tables = strings::Split(FLAGS_tables, ",", strings::SkipEmpty());
-    vector<string> tablets = strings::Split(FLAGS_tablets, ",", strings::SkipEmpty());
-    PUSH_PREPEND_NOT_OK(ksck->ChecksumData(tables, tablets, ChecksumOptions()),
+    PUSH_PREPEND_NOT_OK(ksck->ChecksumData(ChecksumOptions()),
                         error_messages, "Checksum scan error");
   }
 }


[7/9] incubator-kudu git commit: ksck: improve output for long-running ksck checksums

Posted by jd...@apache.org.
ksck: improve output for long-running ksck checksums

Checksumming a large (multi-TB) table can take many minutes. Previously, the
ksck output would be very quiet during that time, giving no indication as to
whether it was making progress or how much work might be remaining. This
addresses that by:

- passing back how many bytes and number of rows have been summed so
  far on a regular basis
- reporting progress every 5 seconds, including the above numbers

Along the way, I decided that our default timeout of 5 minutes was way too low
for typical table sizes, so bumped it to an hour. I also added more mock-based
test coverage of the checksum-scan code path.

Change-Id: I2a9962329570e8383087747d36cee9ad4fa60825
Reviewed-on: http://gerrit.cloudera.org:8080/3715
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/f9928e8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/f9928e8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/f9928e8a

Branch: refs/heads/master
Commit: f9928e8a1012d8f7ff28ba0c963db5c4f33df25b
Parents: bcf1adc
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 20 16:16:02 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:42:47 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck-test.cc            |  13 ++-
 src/kudu/tools/ksck.cc                 | 125 ++++++++++++++++++++--------
 src/kudu/tools/ksck.h                  |  17 +++-
 src/kudu/tools/ksck_remote-test.cc     |  21 +++++
 src/kudu/tools/ksck_remote.cc          |  22 +++--
 src/kudu/tools/ksck_remote.h           |   7 +-
 src/kudu/tserver/tablet_server-test.cc |   2 +
 src/kudu/tserver/tablet_service.cc     |  25 ++++--
 src/kudu/tserver/tserver_service.proto |   8 ++
 9 files changed, 179 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 70c4dd0..acc1ea4 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -47,7 +47,7 @@ class MockKsckTabletServer : public KsckTabletServer {
   }
 
   Status FetchInfo() override {
-    timestamp_ = 0;
+    timestamp_ = 12345;
     if (fetch_info_status_.ok()) {
       state_ = kFetched;
     } else {
@@ -60,8 +60,9 @@ class MockKsckTabletServer : public KsckTabletServer {
       const std::string& tablet_id,
       const Schema& schema,
       const ChecksumOptions& options,
-      const ReportResultCallback& callback) OVERRIDE {
-    callback.Run(Status::OK(), 0);
+      ChecksumProgressCallbacks* callbacks) OVERRIDE {
+    callbacks->Progress(10, 20);
+    callbacks->Finished(Status::OK(), 0);
   }
 
   virtual std::string address() const OVERRIDE {
@@ -320,11 +321,17 @@ TEST_F(KsckTest, TestZeroTableCheck) {
 TEST_F(KsckTest, TestOneTableCheck) {
   CreateOneTableOneTablet();
   ASSERT_OK(RunKsck());
+  ASSERT_OK(ksck_->ChecksumData({}, {}, ChecksumOptions()));
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "0/1 replicas remaining (20B from disk, 10 rows summed)");
 }
 
 TEST_F(KsckTest, TestOneSmallReplicatedTable) {
   CreateOneSmallReplicatedTable();
   ASSERT_OK(RunKsck());
+  ASSERT_OK(ksck_->ChecksumData({}, {}, ChecksumOptions()));
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "0/9 replicas remaining (180B from disk, 90 rows summed)");
 }
 
 TEST_F(KsckTest, TestOneOneTabletBrokenTable) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index c46095b..06eb3cd 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tools/ksck.h"
 
+#include <algorithm>
 #include <glog/logging.h>
 #include <iostream>
 #include <mutex>
@@ -26,6 +27,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
@@ -45,7 +47,7 @@ using std::string;
 using std::unordered_map;
 using strings::Substitute;
 
-DEFINE_int32(checksum_timeout_sec, 120,
+DEFINE_int32(checksum_timeout_sec, 3600,
              "Maximum total seconds to wait for a checksum scan to complete "
              "before timing out.");
 DEFINE_int32(checksum_scan_concurrency, 4,
@@ -230,7 +232,15 @@ class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporte
 
   // Initialize reporter with the number of replicas being queried.
   explicit ChecksumResultReporter(int num_tablet_replicas)
-      : responses_(num_tablet_replicas) {
+      : expected_count_(num_tablet_replicas),
+        responses_(num_tablet_replicas),
+        rows_summed_(0),
+        disk_bytes_summed_(0) {
+  }
+
+  void ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
+    rows_summed_.IncrementBy(delta_rows);
+    disk_bytes_summed_.IncrementBy(delta_bytes);
   }
 
   // Write an entry to the result map indicating a response from the remote.
@@ -250,7 +260,29 @@ class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporte
   // whichever comes first.
   // Returns false if the timeout expired before all responses came in.
   // Otherwise, returns true.
-  bool WaitFor(const MonoDelta& timeout) const { return responses_.WaitFor(timeout); }
+  bool WaitFor(const MonoDelta& timeout) const {
+    MonoTime start = MonoTime::Now(MonoTime::FINE);
+
+    MonoTime deadline = start;
+    deadline.AddDelta(timeout);
+
+    bool done = false;
+    while (!done) {
+      MonoTime now = MonoTime::Now(MonoTime::FINE);
+      int rem_ms = deadline.GetDeltaSince(now).ToMilliseconds();
+      if (rem_ms <= 0) return false;
+
+      done = responses_.WaitFor(MonoDelta::FromMilliseconds(std::min(rem_ms, 5000)));
+      string status = done ? "finished in " : "running for ";
+      int run_time_sec = MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToSeconds();
+      Info() << "Checksum " << status << run_time_sec << "s: "
+             << responses_.count() << "/" << expected_count_ << " replicas remaining ("
+             << HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()) << " from disk, "
+             << HumanReadableInt::ToString(rows_summed_.Load()) << " rows summed)"
+             << endl;
+    }
+    return true;
+  }
 
   // Returns true iff all replicas have reported in.
   bool AllReported() const { return responses_.count() == 0; }
@@ -269,41 +301,63 @@ class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporte
   void HandleResponse(const std::string& tablet_id, const std::string& replica_uuid,
                       const Status& status, uint64_t checksum);
 
+  const int expected_count_;
   CountDownLatch responses_;
+
   mutable simple_spinlock lock_; // Protects 'checksums_'.
   // checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
   TabletResultMap checksums_;
+
+  AtomicInt<int64_t> rows_summed_;
+  AtomicInt<int64_t> disk_bytes_summed_;
 };
 
 // Queue of tablet replicas for an individual tablet server.
-typedef shared_ptr<BlockingQueue<std::pair<Schema, std::string> > > TabletQueue;
+typedef shared_ptr<BlockingQueue<std::pair<Schema, std::string> > > SharedTabletQueue;
 
-// A callback function which records the result of a tablet replica's checksum,
+// A set of callbacks which records the result of a tablet replica's checksum,
 // and then checks if the tablet server has any more tablets to checksum. If so,
 // a new async checksum scan is started.
-void TabletServerChecksumCallback(
-    const scoped_refptr<ChecksumResultReporter>& reporter,
-    const shared_ptr<KsckTabletServer>& tablet_server,
-    const TabletQueue& queue,
-    const std::string& tablet_id,
-    const ChecksumOptions& options,
-    const Status& status,
-    uint64_t checksum) {
-  reporter->ReportResult(tablet_id, tablet_server->uuid(), status, checksum);
-
-  std::pair<Schema, std::string> table_tablet;
-  if (queue->BlockingGet(&table_tablet)) {
-    const Schema& table_schema = table_tablet.first;
-    const std::string& tablet_id = table_tablet.second;
-    ReportResultCallback callback = Bind(&TabletServerChecksumCallback,
-                                         reporter,
-                                         tablet_server,
-                                         queue,
-                                         tablet_id,
-                                         options);
-    tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, callback);
+class TabletServerChecksumCallbacks : public ChecksumProgressCallbacks {
+ public:
+  TabletServerChecksumCallbacks(
+    scoped_refptr<ChecksumResultReporter> reporter,
+    shared_ptr<KsckTabletServer> tablet_server,
+    SharedTabletQueue queue,
+    std::string tablet_id,
+    ChecksumOptions options) :
+      reporter_(std::move(reporter)),
+      tablet_server_(std::move(tablet_server)),
+      queue_(std::move(queue)),
+      options_(std::move(options)),
+      tablet_id_(std::move(tablet_id)) {
+  }
+
+  void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override {
+    reporter_->ReportProgress(rows_summed, disk_bytes_summed);
+  }
+
+  void Finished(const Status& status, uint64_t checksum) override {
+    reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
+
+    std::pair<Schema, std::string> table_tablet;
+    if (queue_->BlockingGet(&table_tablet)) {
+      const Schema& table_schema = table_tablet.first;
+      tablet_id_ = table_tablet.second;
+      tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
+    } else {
+      delete this;
+    }
   }
-}
+
+ private:
+  const scoped_refptr<ChecksumResultReporter> reporter_;
+  const shared_ptr<KsckTabletServer> tablet_server_;
+  const SharedTabletQueue queue_;
+  const ChecksumOptions options_;
+
+  std::string tablet_id_;
+};
 
 Status Ksck::ChecksumData(const vector<string>& tables,
                           const vector<string>& tablets,
@@ -343,7 +397,7 @@ Status Ksck::ChecksumData(const vector<string>& tables,
   }
 
   // Map of tablet servers to tablet queue.
-  typedef unordered_map<shared_ptr<KsckTabletServer>, TabletQueue> TabletServerQueueMap;
+  typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
 
   TabletServerQueueMap tablet_server_queues;
   scoped_refptr<ChecksumResultReporter> reporter(new ChecksumResultReporter(num_tablet_replicas));
@@ -356,7 +410,7 @@ Status Ksck::ChecksumData(const vector<string>& tables,
       const shared_ptr<KsckTabletServer>& ts =
           FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
 
-      const TabletQueue& queue =
+      const SharedTabletQueue& queue =
           LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_tablet_replicas);
       CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
     }
@@ -383,20 +437,17 @@ Status Ksck::ChecksumData(const vector<string>& tables,
   // scan when it returns if the queue for that TS is not empty.
   for (const TabletServerQueueMap::value_type& entry : tablet_server_queues) {
     const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
-    const TabletQueue& queue = entry.second;
+    const SharedTabletQueue& queue = entry.second;
     queue->Shutdown(); // Ensures that BlockingGet() will not block.
     for (int i = 0; i < options.scan_concurrency; i++) {
       std::pair<Schema, std::string> table_tablet;
       if (queue->BlockingGet(&table_tablet)) {
         const Schema& table_schema = table_tablet.first;
         const std::string& tablet_id = table_tablet.second;
-        ReportResultCallback callback = Bind(&TabletServerChecksumCallback,
-                                             reporter,
-                                             tablet_server,
-                                             queue,
-                                             tablet_id,
-                                             options);
-        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, callback);
+        auto* cbs = new TabletServerChecksumCallbacks(
+            reporter, tablet_server, queue, tablet_id, options);
+        // 'cbs' deletes itself when complete.
+        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 80bc63d..2976944 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -162,7 +162,20 @@ class KsckTable {
   DISALLOW_COPY_AND_ASSIGN(KsckTable);
 };
 
-typedef Callback<void(const Status& status, uint64_t checksum)> ReportResultCallback;
+// Interface for reporting progress on checksumming a single
+// replica.
+class ChecksumProgressCallbacks {
+ public:
+  virtual ~ChecksumProgressCallbacks() {}
+
+  // Report incremental progress from the server side.
+  // 'disk_bytes_summed' only counts data read from DiskRowSets on the server side
+  // and does not count MRS bytes, etc.
+  virtual void Progress(int64_t delta_rows_summed, int64_t delta_disk_bytes_summed) = 0;
+
+  // The scan of the current tablet is complete.
+  virtual void Finished(const Status& status, uint64_t checksum) = 0;
+};
 
 // The following two classes must be extended in order to communicate with their respective
 // components. The two main use cases envisioned for this are:
@@ -187,7 +200,7 @@ class KsckTabletServer {
                   const std::string& tablet_id,
                   const Schema& schema,
                   const ChecksumOptions& options,
-                  const ReportResultCallback& callback) = 0;
+                  ChecksumProgressCallbacks* callbacks) = 0;
 
   virtual const std::string& uuid() const {
     return uuid_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index f8d7a83..fd22975 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -45,6 +45,10 @@ using std::string;
 using std::vector;
 using strings::Substitute;
 
+// Import this symbol from ksck.cc so we can introspect the
+// errors being written to stderr.
+extern std::ostream* g_err_stream;
+
 static const char *kTableName = "ksck-test-table";
 
 class RemoteKsckTest : public KuduTest {
@@ -55,6 +59,11 @@ class RemoteKsckTest : public KuduTest {
     b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
     b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
     CHECK_OK(b.Build(&schema_));
+    g_err_stream = &err_stream_;
+  }
+
+  ~RemoteKsckTest() {
+    g_err_stream = NULL;
   }
 
   virtual void SetUp() OVERRIDE {
@@ -174,6 +183,9 @@ class RemoteKsckTest : public KuduTest {
   std::shared_ptr<Ksck> ksck_;
   shared_ptr<client::KuduClient> client_;
 
+  // Captures logged messages from ksck.
+  std::stringstream err_stream_;
+
  private:
   Sockaddr master_rpc_addr_;
   std::shared_ptr<MiniCluster> mini_cluster_;
@@ -221,10 +233,19 @@ TEST_F(RemoteKsckTest, TestChecksum) {
   Status s;
   while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
     ASSERT_OK(ksck_->FetchTableAndTabletInfo());
+
+    err_stream_.str("");
     s = ksck_->ChecksumData(vector<string>(),
                             vector<string>(),
                             ChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
     if (s.ok()) {
+      // Check the status message at the end of the checksum.
+      // We expect '0B from disk' because we didn't write enough data to trigger a flush
+      // in this short-running test.
+      ASSERT_STR_CONTAINS(err_stream_.str(),
+                          AllowSlowTests() ?
+                          "0/30 replicas remaining (0B from disk, 300 rows summed)" :
+                          "0/9 replicas remaining (0B from disk, 300 rows summed)");
       break;
     }
     SleepFor(MonoDelta::FromMilliseconds(10));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 3ba95ee..641271c 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -122,13 +122,13 @@ class ChecksumCallbackHandler {
 class ChecksumStepper {
  public:
   ChecksumStepper(string tablet_id, const Schema& schema, string server_uuid,
-                  ChecksumOptions options, ReportResultCallback callback,
+                  ChecksumOptions options, ChecksumProgressCallbacks* callbacks,
                   shared_ptr<tserver::TabletServerServiceProxy> proxy)
       : schema_(schema),
         tablet_id_(std::move(tablet_id)),
         server_uuid_(std::move(server_uuid)),
         options_(std::move(options)),
-        reporter_callback_(std::move(callback)),
+        callbacks_(callbacks),
         proxy_(std::move(proxy)),
         call_seq_id_(0),
         checksum_(0) {
@@ -139,7 +139,7 @@ class ChecksumStepper {
     Status s = SchemaToColumnPBs(schema_, &cols_,
                                  SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES);
     if (!s.ok()) {
-      reporter_callback_.Run(s, 0);
+      callbacks_->Finished(s, 0);
     } else {
       SendRequest(kNewRequest);
     }
@@ -152,16 +152,20 @@ class ChecksumStepper {
       s = StatusFromPB(resp_.error().status());
     }
     if (!s.ok()) {
-      reporter_callback_.Run(s, 0);
+      callbacks_->Finished(s, 0);
       return; // Deletes 'this'.
     }
-
+    if (resp_.has_resource_metrics() || resp_.has_rows_checksummed()) {
+      auto bytes = resp_.resource_metrics().cfile_cache_miss_bytes() +
+          resp_.resource_metrics().cfile_cache_hit_bytes();
+      callbacks_->Progress(resp_.rows_checksummed(), bytes);;
+    }
     DCHECK(resp_.has_checksum());
     checksum_ = resp_.checksum();
 
     // Report back with results.
     if (!resp_.has_more_results()) {
-      reporter_callback_.Run(s, checksum_);
+      callbacks_->Finished(s, checksum_);
       return; // Deletes 'this'.
     }
 
@@ -220,7 +224,7 @@ class ChecksumStepper {
   const string tablet_id_;
   const string server_uuid_;
   const ChecksumOptions options_;
-  const ReportResultCallback reporter_callback_;
+  ChecksumProgressCallbacks* const callbacks_;
   const shared_ptr<tserver::TabletServerServiceProxy> proxy_;
 
   uint32_t call_seq_id_;
@@ -240,9 +244,9 @@ void RemoteKsckTabletServer::RunTabletChecksumScanAsync(
         const string& tablet_id,
         const Schema& schema,
         const ChecksumOptions& options,
-        const ReportResultCallback& callback) {
+        ChecksumProgressCallbacks* callbacks) {
   gscoped_ptr<ChecksumStepper> stepper(
-      new ChecksumStepper(tablet_id, schema, uuid(), options, callback, ts_proxy_));
+      new ChecksumStepper(tablet_id, schema, uuid(), options, callbacks, ts_proxy_));
   stepper->Start();
   ignore_result(stepper.release()); // Deletes self on callback.
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tools/ksck_remote.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 9ba4e99..8068bb0 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -53,14 +53,13 @@ class RemoteKsckTabletServer : public KsckTabletServer {
   // Must be called after constructing.
   Status Init();
 
-  virtual Status FetchInfo() OVERRIDE;
+  Status FetchInfo() override;
 
-  virtual void RunTabletChecksumScanAsync(
+  void RunTabletChecksumScanAsync(
       const std::string& tablet_id,
       const Schema& schema,
       const ChecksumOptions& options,
-      const ReportResultCallback& callback) OVERRIDE;
-
+      ChecksumProgressCallbacks* callbacks) override;
 
   virtual std::string address() const OVERRIDE {
     return host_port_.ToString();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 40bbd19..05ca99b 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -2170,6 +2170,8 @@ TEST_F(TabletServerTest, TestChecksumScan) {
   ASSERT_FALSE(resp.has_error()) << resp.error().DebugString();
   ASSERT_EQ(total_crc, resp.checksum());
   ASSERT_FALSE(resp.has_more_results());
+  EXPECT_TRUE(resp.has_resource_metrics());
+  EXPECT_EQ(1, resp.rows_checksummed());
 
   // Second row (null string field).
   key = 2;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 5a9c04b..f3f6db0 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -402,7 +402,8 @@ class ScanResultChecksummer : public ScanResultCollector {
   ScanResultChecksummer()
       : crc_(crc::GetCrc32cInstance()),
         agg_checksum_(0),
-        blocks_processed_(0) {
+        blocks_processed_(0),
+        rows_checksummed_(0) {
   }
 
   virtual void HandleRowBlock(const Schema* client_projection_schema,
@@ -417,6 +418,7 @@ class ScanResultChecksummer : public ScanResultCollector {
       if (!row_block.selection_vector()->IsRowSelected(i)) continue;
       uint32_t row_crc = CalcRowCrc32(*client_projection_schema, row_block.row(i));
       agg_checksum_ += row_crc;
+      rows_checksummed_++;
     }
     // Find the last selected row and save its encoded key.
     SetLastRow(row_block, &encoded_last_row_);
@@ -433,6 +435,10 @@ class ScanResultChecksummer : public ScanResultCollector {
     return 0;
   }
 
+  int64_t rows_checksummed() const {
+    return rows_checksummed_;
+  }
+
   // Accessors for initializing / setting the checksum.
   void set_agg_checksum(uint64_t value) { agg_checksum_ = value; }
   uint64_t agg_checksum() const { return agg_checksum_; }
@@ -469,6 +475,7 @@ class ScanResultChecksummer : public ScanResultCollector {
   crc::Crc* const crc_;
   uint64_t agg_checksum_;
   int blocks_processed_;
+  int64_t rows_checksummed_;
   faststring encoded_last_row_;
 
   DISALLOW_COPY_AND_ASSIGN(ScanResultChecksummer);
@@ -1005,6 +1012,14 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
   context->RespondSuccess();
 }
 
+namespace {
+void SetResourceMetrics(ResourceMetricsPB* metrics, rpc::RpcContext* context) {
+  metrics->set_cfile_cache_miss_bytes(
+    context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME));
+  metrics->set_cfile_cache_hit_bytes(
+    context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME));
+}
+} // anonymous namespace
 
 void TabletServiceImpl::Scan(const ScanRequestPB* req,
                              ScanResponsePB* resp,
@@ -1090,10 +1105,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
       resp->set_last_primary_key(last.ToString());
     }
   }
-  resp->mutable_resource_metrics()->set_cfile_cache_miss_bytes(
-    context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME));
-  resp->mutable_resource_metrics()->set_cfile_cache_hit_bytes(
-    context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME));
+  SetResourceMetrics(resp->mutable_resource_metrics(), context);
   context->RespondSuccess();
 }
 
@@ -1178,7 +1190,8 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
 
   resp->set_checksum(collector.agg_checksum());
   resp->set_has_more_results(has_more);
-
+  SetResourceMetrics(resp->mutable_resource_metrics(), context);
+  resp->set_rows_checksummed(collector.rows_checksummed());
   context->RespondSuccess();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f9928e8a/src/kudu/tserver/tserver_service.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver_service.proto b/src/kudu/tserver/tserver_service.proto
index eba8fa9..0897cae 100644
--- a/src/kudu/tserver/tserver_service.proto
+++ b/src/kudu/tserver/tserver_service.proto
@@ -70,4 +70,12 @@ message ChecksumResponsePB {
   optional bytes scanner_id = 3;
   optional bool has_more_results = 4;
   optional fixed64 snap_timestamp = 5;
+
+  // Number of rows checksummed by this RPC.
+  // NOTE: this is not an aggregate since the scanner was opened, but
+  // rather just a count for _this_ round trip.
+  optional int64 rows_checksummed = 6;
+
+  // Resource consumption of the underlying scanner.
+  optional ResourceMetricsPB resource_metrics = 7;
 }


[4/9] incubator-kudu git commit: ksck: multi-thread the fetching of replica info from tablet servers

Posted by jd...@apache.org.
ksck: multi-thread the fetching of replica info from tablet servers

In clusters with a lot of tablets, fetching the data from each tablet server
can take a while (eg ~100ms per server from my laptop to a test cluster in our
datacenter). For a large cluster (~70 nodes), this makes ksck rather slow.

Multi-threading the fetching makes this much faster (5s vs original 31 seconds
for the above test cluster).

Change-Id: Ib7784697fb227743dccaa98922fb958cd6a3270e
Reviewed-on: http://gerrit.cloudera.org:8080/3705
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/9cc2e041
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/9cc2e041
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/9cc2e041

Branch: refs/heads/master
Commit: 9cc2e041f3440fe9a731644bbc269b2fdace8fab
Parents: 7ba9d99
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 20 14:39:04 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:24:56 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck.cc | 29 ++++++++++++++++++++++-------
 1 file changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9cc2e041/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index cb95f3a..67f9ff2 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -27,9 +27,11 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/threadpool.h"
 
 namespace kudu {
 namespace tools {
@@ -53,6 +55,9 @@ DEFINE_uint64(checksum_snapshot_timestamp, ChecksumOptions::kCurrentTimestamp,
               "timestamp to use for snapshot checksum scans, defaults to 0, which "
               "uses the current timestamp of a tablet server involved in the scan");
 
+DEFINE_int32(fetch_replica_info_concurrency, 20,
+             "Number of concurrent tablet servers to fetch replica info from.");
+
 // The stream to write output to. If this is NULL, defaults to cerr.
 // This is used by tests to capture output.
 ostream* g_err_stream = NULL;
@@ -148,20 +153,30 @@ Status Ksck::FetchInfoFromTabletServers() {
     return Status::NotFound("No tablet servers found");
   }
 
-  int bad_servers = 0;
+
+  gscoped_ptr<ThreadPool> pool;
+  RETURN_NOT_OK(ThreadPoolBuilder("ksck-fetch")
+                .set_max_threads(FLAGS_fetch_replica_info_concurrency)
+                .Build(&pool));
+
+  AtomicInt<int32_t> bad_servers(0);
   VLOG(1) << "Fetching info from all the Tablet Servers";
   for (const KsckMaster::TSMap::value_type& entry : cluster_->tablet_servers()) {
-    Status s = ConnectToTabletServer(entry.second);
-    if (!s.ok()) {
-      bad_servers++;
-    }
+    CHECK_OK(pool->SubmitFunc([&]() {
+          Status s = ConnectToTabletServer(entry.second);
+          if (!s.ok()) {
+            bad_servers.Increment();
+          }
+        }));
   }
-  if (bad_servers == 0) {
+  pool->Wait();
+
+  if (bad_servers.Load() == 0) {
     Info() << Substitute("Fetched info from all $0 Tablet Servers", servers_count) << endl;
     return Status::OK();
   } else {
     Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable",
-                         servers_count - bad_servers, bad_servers) << endl;
+                         servers_count - bad_servers.Load(), bad_servers.Load()) << endl;
     return Status::NetworkError("Not all Tablet Servers are reachable");
   }
 }


[3/9] incubator-kudu git commit: ksck: report hostnames instead of IP addresses for tablet servers

Posted by jd...@apache.org.
ksck: report hostnames instead of IP addresses for tablet servers

Typically admins know their hosts by hostname instead of by IP address.
This rejiggers the code a bit to save the original host/port instead of
the resolved address, so that the output contains a hostname instead
of an IP.

Tested manually against a cluster (hard to add an assertion since we
don't have the ability to mock DNS).

Change-Id: I8164dca050fd1adcc034a91cebc241e6fff8a117
Reviewed-on: http://gerrit.cloudera.org:8080/3704
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/7ba9d994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/7ba9d994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/7ba9d994

Branch: refs/heads/master
Commit: 7ba9d994448dade702c959febe51abebce65920c
Parents: 513d6e9
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 20 14:29:51 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:23:31 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck-test.cc   |  2 +-
 src/kudu/tools/ksck.h         |  2 +-
 src/kudu/tools/ksck_remote.cc | 20 +++++++++++++++-----
 src/kudu/tools/ksck_remote.h  | 25 +++++++++++++------------
 4 files changed, 30 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7ba9d994/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index a9184ab..44fcf69 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -64,7 +64,7 @@ class MockKsckTabletServer : public KsckTabletServer {
     callback.Run(Status::OK(), 0);
   }
 
-  virtual const std::string& address() const OVERRIDE {
+  virtual std::string address() const OVERRIDE {
     return address_;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7ba9d994/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 502f68e..80bc63d 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -197,7 +197,7 @@ class KsckTabletServer {
     return strings::Substitute("$0 ($1)", uuid(), address());
   }
 
-  virtual const std::string& address() const = 0;
+  virtual std::string address() const = 0;
 
   bool is_healthy() const {
     CHECK_NE(state_, kUninitialized);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7ba9d994/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 325865a..66a495a 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -45,6 +45,16 @@ MonoDelta GetDefaultTimeout() {
   return MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
 }
 
+Status RemoteKsckTabletServer::Init() {
+  vector<Sockaddr> addresses;
+  RETURN_NOT_OK(ParseAddressList(
+      host_port_.ToString(),
+      tserver::TabletServer::kDefaultPort, &addresses));
+  generic_proxy_.reset(new server::GenericServiceProxy(messenger_, addresses[0]));
+  ts_proxy_.reset(new tserver::TabletServerServiceProxy(messenger_, addresses[0]));
+  return Status::OK();
+}
+
 Status RemoteKsckTabletServer::FetchInfo() {
   state_ = kFetchFailed;
 
@@ -262,11 +272,11 @@ Status RemoteKsckMaster::RetrieveTabletServers(TSMap* tablet_servers) {
   tablet_servers->clear();
   for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
     HostPortPB addr = e.registration().rpc_addresses(0);
-    vector<Sockaddr> addresses;
-    RETURN_NOT_OK(ParseAddressList(HostPort(addr.host(), addr.port()).ToString(),
-                                   tserver::TabletServer::kDefaultPort, &addresses));
-    shared_ptr<KsckTabletServer> ts(
-        new RemoteKsckTabletServer(e.instance_id().permanent_uuid(), addresses[0], messenger_));
+    shared_ptr<RemoteKsckTabletServer> ts(
+        new RemoteKsckTabletServer(e.instance_id().permanent_uuid(),
+                                   HostPort(addr.host(), addr.port()),
+                                   messenger_));
+    RETURN_NOT_OK(ts->Init());
     InsertOrDie(tablet_servers, ts->uuid(), ts);
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7ba9d994/src/kudu/tools/ksck_remote.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 5ca2a3b..9ba4e99 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -42,15 +42,17 @@ namespace tools {
 class RemoteKsckTabletServer : public KsckTabletServer {
  public:
   explicit RemoteKsckTabletServer(const std::string& id,
-                                  const Sockaddr& address,
-                                  const std::shared_ptr<rpc::Messenger>& messenger)
+                                  const HostPort host_port,
+                                  std::shared_ptr<rpc::Messenger> messenger)
       : KsckTabletServer(id),
-        address_(address.ToString()),
-        messenger_(messenger),
-        generic_proxy_(new server::GenericServiceProxy(messenger, address)),
-        ts_proxy_(new tserver::TabletServerServiceProxy(messenger, address)) {
+        host_port_(host_port),
+        messenger_(std::move(messenger)) {
   }
 
+  // Resolves the host/port and sets up proxies.
+  // Must be called after constructing.
+  Status Init();
+
   virtual Status FetchInfo() OVERRIDE;
 
   virtual void RunTabletChecksumScanAsync(
@@ -60,16 +62,15 @@ class RemoteKsckTabletServer : public KsckTabletServer {
       const ReportResultCallback& callback) OVERRIDE;
 
 
-  virtual const std::string& address() const OVERRIDE {
-    return address_;
+  virtual std::string address() const OVERRIDE {
+    return host_port_.ToString();
   }
 
  private:
-  const std::string address_;
+  const HostPort host_port_;
   const std::shared_ptr<rpc::Messenger> messenger_;
-  const std::shared_ptr<server::GenericServiceProxy> generic_proxy_;
-  const std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy_;
-
+  std::shared_ptr<server::GenericServiceProxy> generic_proxy_;
+  std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy_;
 };
 
 // This implementation connects to a Master via RPC.


[2/9] incubator-kudu git commit: KUDU-1516 ksck should check for more raft-related status issues (partial)

Posted by jd...@apache.org.
KUDU-1516 ksck should check for more raft-related status issues (partial)

This patch improves ksck. The main way it does so is by adding "tablet
server POV" information. ksck now gathers information about tablet
replicas from the tablet servers and cross-references this information
with the master metadata. This adds the following checks:

* each tablet has a majority of replicas on live tablet servers
* if a tablet has a majority of replicas on a live tablet
  server, then a majority of its tablets are in RUNNING state
* the assignments of tablets to tablet servers in the master agrees with
  the assignment of tablet replicas reported by the tablet servers

This patch does not include other desiderata from KUDU-1516, like a consensus
canary or a write op canary.

The code is also restructured quite a bit, so that all of the "fetch
information from tablet servers" work happens up front in a single call. This
paves the way a bit for a future enhancement in which all of these RPCs are
done on a thread-pool (since it can be somewhat slow for large clusters).

To try to improve performance for clusters with a lot of data, I also added a
flag to the ListTablets RPC so that the response does not include schema
information, which is both large and irrelevant for this use case.

An example of the new output against a cluster with some dead tablet servers
and broken tablets is available at:
https://gist.github.com/toddlipcon/7ae677214988d064627bf1325f04dfac

This patch is based on some earlier work by Will Berkeley.

Change-Id: Iec6590ba52548a9ee11d63269b134320b10809da
Reviewed-on: http://gerrit.cloudera.org:8080/3632
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/513d6e9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/513d6e9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/513d6e9f

Branch: refs/heads/master
Commit: 513d6e9f5d42242edb3c3f40f0c5c968873160ea
Parents: c0b4f50
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Jul 19 18:37:47 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Jul 22 20:22:43 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/cluster_verifier.cc |   6 +-
 src/kudu/master/master.proto                   |   2 +-
 src/kudu/tools/CMakeLists.txt                  |   1 +
 src/kudu/tools/ksck-test.cc                    | 209 +++++++++++++++-----
 src/kudu/tools/ksck.cc                         | 147 ++++++++++----
 src/kudu/tools/ksck.h                          |  81 ++++++--
 src/kudu/tools/ksck_remote-test.cc             |   7 +-
 src/kudu/tools/ksck_remote.cc                  |  52 +++--
 src/kudu/tools/ksck_remote.h                   |   5 +-
 src/kudu/tools/kudu-ksck.cc                    |   4 +-
 src/kudu/tserver/tablet_service.cc             |   9 +-
 src/kudu/tserver/tserver.proto                 |   9 +-
 12 files changed, 402 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index e876875..64d406a 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -85,10 +85,14 @@ Status ClusterVerifier::DoKsck() {
   std::shared_ptr<KsckCluster> cluster(new KsckCluster(master));
   std::shared_ptr<Ksck> ksck(new Ksck(cluster));
 
+  // Some unit tests create or remove replicas of tablets, which
+  // we shouldn't consider fatal.
+  ksck->set_check_replica_count(false);
+
   // This is required for everything below.
   RETURN_NOT_OK(ksck->CheckMasterRunning());
   RETURN_NOT_OK(ksck->FetchTableAndTabletInfo());
-  RETURN_NOT_OK(ksck->CheckTabletServersRunning());
+  RETURN_NOT_OK(ksck->FetchInfoFromTabletServers());
   RETURN_NOT_OK(ksck->CheckTablesConsistency());
 
   vector<string> tables;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 35a3354..402ab2b 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -112,7 +112,7 @@ message SysTabletsEntryPB {
   // Tablet partition.
   optional PartitionPB partition = 7;
 
-  // The latest committed consensus configuration consensus configuration reported to the Master.
+  // The latest committed consensus configuration reported to the Master.
   optional consensus.ConsensusStatePB committed_consensus_state = 3;
 
   // Debug state for the tablet.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 1a6a570..ab7c0c5 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -74,6 +74,7 @@ add_library(ksck
     ksck_remote.cc
 )
 target_link_libraries(ksck
+  consensus
   master_proto
   server_base_proto
   tserver_proto

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index f64ff94..a9184ab 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -22,6 +22,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_util.h"
 
 namespace kudu {
@@ -31,18 +32,28 @@ using std::shared_ptr;
 using std::static_pointer_cast;
 using std::string;
 using std::unordered_map;
-using std::vector;
+using strings::Substitute;
+
+// Import this symbol from ksck.cc so we can introspect the
+// errors being written to stderr.
+extern std::ostream* g_err_stream;
 
 class MockKsckTabletServer : public KsckTabletServer {
  public:
   explicit MockKsckTabletServer(const string& uuid)
       : KsckTabletServer(uuid),
-        connect_status_(Status::OK()),
+        fetch_info_status_(Status::OK()),
         address_("<mock>") {
   }
 
-  virtual Status Connect() const OVERRIDE {
-    return connect_status_;
+  Status FetchInfo() override {
+    timestamp_ = 0;
+    if (fetch_info_status_.ok()) {
+      state_ = kFetched;
+    } else {
+      state_ = kFetchFailed;
+    }
+    return fetch_info_status_;
   }
 
   virtual void RunTabletChecksumScanAsync(
@@ -53,17 +64,12 @@ class MockKsckTabletServer : public KsckTabletServer {
     callback.Run(Status::OK(), 0);
   }
 
-  virtual Status CurrentTimestamp(uint64_t* timestamp) const OVERRIDE {
-    *timestamp = 0;
-    return Status::OK();
-  }
-
   virtual const std::string& address() const OVERRIDE {
     return address_;
   }
 
   // Public because the unit tests mutate this variable directly.
-  Status connect_status_;
+  Status fetch_info_status_;
 
  private:
   const string address_;
@@ -72,11 +78,11 @@ class MockKsckTabletServer : public KsckTabletServer {
 class MockKsckMaster : public KsckMaster {
  public:
   MockKsckMaster()
-      : connect_status_(Status::OK()) {
+      : fetch_info_status_(Status::OK()) {
   }
 
   virtual Status Connect() const OVERRIDE {
-    return connect_status_;
+    return fetch_info_status_;
   }
 
   virtual Status RetrieveTabletServers(TSMap* tablet_servers) OVERRIDE {
@@ -94,7 +100,7 @@ class MockKsckMaster : public KsckMaster {
   }
 
   // Public because the unit tests mutate these variables directly.
-  Status connect_status_;
+  Status fetch_info_status_;
   TSMap tablet_servers_;
   vector<shared_ptr<KsckTable>> tables_;
 };
@@ -107,11 +113,17 @@ class KsckTest : public KuduTest {
         ksck_(new Ksck(cluster_)) {
     unordered_map<string, shared_ptr<KsckTabletServer>> tablet_servers;
     for (int i = 0; i < 3; i++) {
-      string name = strings::Substitute("$0", i);
+      string name = Substitute("ts-id-$0", i);
       shared_ptr<MockKsckTabletServer> ts(new MockKsckTabletServer(name));
       InsertOrDie(&tablet_servers, ts->uuid(), ts);
     }
     master_->tablet_servers_.swap(tablet_servers);
+
+    g_err_stream = &err_stream_;
+  }
+
+  ~KsckTest() {
+    g_err_stream = NULL;
   }
 
  protected:
@@ -128,8 +140,8 @@ class KsckTest : public KuduTest {
     CreateDefaultAssignmentPlan(1);
 
     auto table = CreateAndAddTable("test", 1);
-    shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), "1"));
-    CreateAndFillTablet(tablet, 1, true);
+    shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), "tablet-id-1"));
+    CreateAndFillTablet(tablet, 1, true, true);
     table->set_tablets({ tablet });
   }
 
@@ -141,8 +153,25 @@ class KsckTest : public KuduTest {
 
     vector<shared_ptr<KsckTablet>> tablets;
     for (int i = 0; i < num_tablets; i++) {
-      shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), std::to_string(i)));
-      CreateAndFillTablet(tablet, num_replicas, true);
+      shared_ptr<KsckTablet> tablet(new KsckTablet(
+          table.get(), Substitute("tablet-id-$0", i)));
+      CreateAndFillTablet(tablet, num_replicas, true, true);
+      tablets.push_back(tablet);
+    }
+    table->set_tablets(tablets);
+  }
+
+  void CreateOneSmallReplicatedTableWithTabletNotRunning() {
+    int num_replicas = 3;
+    int num_tablets = 3;
+    CreateDefaultAssignmentPlan(num_replicas * num_tablets);
+    auto table = CreateAndAddTable("test", num_replicas);
+
+    vector<shared_ptr<KsckTablet>> tablets;
+    for (int i = 0; i < num_tablets; i++) {
+      shared_ptr<KsckTablet> tablet(new KsckTablet(
+          table.get(), Substitute("tablet-id-$0", i)));
+      CreateAndFillTablet(tablet, num_replicas, true, i != 0);
       tablets.push_back(tablet);
     }
     table->set_tablets(tablets);
@@ -154,10 +183,9 @@ class KsckTest : public KuduTest {
 
     auto table = CreateAndAddTable("test", 3);
 
-    shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), "1"));
-    CreateAndFillTablet(tablet, 2, false);
+    shared_ptr<KsckTablet> tablet(new KsckTablet(table.get(), "tablet-id-1"));
+    CreateAndFillTablet(tablet, 2, false, true);
     table->set_tablets({ tablet });
-
   }
 
   shared_ptr<KsckTable> CreateAndAddTable(const string& name, int num_replicas) {
@@ -167,25 +195,51 @@ class KsckTest : public KuduTest {
     return table;
   }
 
-  void CreateAndFillTablet(shared_ptr<KsckTablet>& tablet, int num_replicas, bool has_leader) {
+  void CreateAndFillTablet(shared_ptr<KsckTablet>& tablet, int num_replicas,
+                           bool has_leader, bool is_running) {
     vector<shared_ptr<KsckTabletReplica>> replicas;
     if (has_leader) {
-      CreateReplicaAndAdd(replicas, true);
+      CreateReplicaAndAdd(replicas, tablet->id(), true, is_running);
       num_replicas--;
     }
     for (int i = 0; i < num_replicas; i++) {
-      CreateReplicaAndAdd(replicas, false);
+      CreateReplicaAndAdd(replicas, tablet->id(), false, is_running);
     }
     tablet->set_replicas(replicas);
   }
 
-  void CreateReplicaAndAdd(vector<shared_ptr<KsckTabletReplica>>& replicas, bool is_leader) {
+  void CreateReplicaAndAdd(vector<shared_ptr<KsckTabletReplica>>& replicas,
+                           string tablet_id,
+                           bool is_leader,
+                           bool is_running) {
     shared_ptr<KsckTabletReplica> replica(new KsckTabletReplica(assignment_plan_.back(),
                                                                 is_leader, !is_leader));
+    shared_ptr<MockKsckTabletServer> ts = static_pointer_cast<MockKsckTabletServer>(
+            master_->tablet_servers_.at(assignment_plan_.back()));
+
     assignment_plan_.pop_back();
     replicas.push_back(replica);
+
+    // Add the equivalent replica on the tablet server.
+    tablet::TabletStatusPB pb;
+    pb.set_tablet_id(tablet_id);
+    pb.set_table_name("fake-table");
+    pb.set_state(is_running ? tablet::RUNNING : tablet::FAILED);
+    InsertOrDie(&ts->tablet_status_map_, tablet_id, std::move(pb));
+  }
+
+  Status RunKsck() {
+    auto c = MakeScopedCleanup([this]() {
+        LOG(INFO) << "Ksck output:\n" << err_stream_.str();
+      });
+    RETURN_NOT_OK(ksck_->CheckMasterRunning());
+    RETURN_NOT_OK(ksck_->FetchTableAndTabletInfo());
+    RETURN_NOT_OK(ksck_->FetchInfoFromTabletServers());
+    RETURN_NOT_OK(ksck_->CheckTablesConsistency());
+    return Status::OK();
   }
 
+
   shared_ptr<MockKsckMaster> master_;
   shared_ptr<KsckCluster> cluster_;
   shared_ptr<Ksck> ksck_;
@@ -195,6 +249,8 @@ class KsckTest : public KuduTest {
   // you should have a list that looks like ts1,ts2,ts3,ts3,ts2,ts1 so that the two LEADERS, which
   // are assigned first, end up on ts1 and ts3.
   vector<string> assignment_plan_;
+
+  std::stringstream err_stream_;
 };
 
 TEST_F(KsckTest, TestMasterOk) {
@@ -203,55 +259,110 @@ TEST_F(KsckTest, TestMasterOk) {
 
 TEST_F(KsckTest, TestMasterUnavailable) {
   Status error = Status::NetworkError("Network failure");
-  master_->connect_status_ = error;
+  master_->fetch_info_status_ = error;
   ASSERT_TRUE(ksck_->CheckMasterRunning().IsNetworkError());
 }
 
 TEST_F(KsckTest, TestTabletServersOk) {
-  ASSERT_OK(ksck_->CheckMasterRunning());
-  ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  ASSERT_OK(ksck_->CheckTabletServersRunning());
+  ASSERT_OK(RunKsck());
 }
 
 TEST_F(KsckTest, TestBadTabletServer) {
-  ASSERT_OK(ksck_->CheckMasterRunning());
+  CreateOneSmallReplicatedTable();
+
+  // Mock a failure to connect to one of the tablet servers.
   Status error = Status::NetworkError("Network failure");
-  static_pointer_cast<MockKsckTabletServer>(master_->tablet_servers_.begin()->second)
-      ->connect_status_ = error;
+  static_pointer_cast<MockKsckTabletServer>(master_->tablet_servers_["ts-id-1"])
+      ->fetch_info_status_ = error;
+
+  ASSERT_OK(ksck_->CheckMasterRunning());
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  Status s = ksck_->CheckTabletServersRunning();
+  Status s = ksck_->FetchInfoFromTabletServers();
   ASSERT_TRUE(s.IsNetworkError()) << "Status returned: " << s.ToString();
+
+  s = ksck_->CheckTablesConsistency();
+  EXPECT_EQ("Corruption: 1 table(s) are bad", s.ToString());
+  ASSERT_STR_CONTAINS(
+      err_stream_.str(),
+      "WARNING: Unable to connect to Tablet Server "
+      "ts-id-1 (<mock>): Network error: Network failure");
+  ASSERT_STR_CONTAINS(
+      err_stream_.str(),
+      "WARNING: Detected problems with Tablet tablet-id-0 of table 'test'\n"
+      "------------------------------------------------------------\n"
+      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n\n");
+  ASSERT_STR_CONTAINS(
+      err_stream_.str(),
+      "WARNING: Detected problems with Tablet tablet-id-1 of table 'test'\n"
+      "------------------------------------------------------------\n"
+      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n\n");
+  ASSERT_STR_CONTAINS(
+      err_stream_.str(),
+      "WARNING: Detected problems with Tablet tablet-id-2 of table 'test'\n"
+      "------------------------------------------------------------\n"
+      "WARNING: Should have a replica on TS ts-id-1 (<mock>), but TS is unavailable\n\n");
+}
+
+TEST_F(KsckTest, TestZeroTabletReplicasCheck) {
+  ASSERT_OK(RunKsck());
 }
 
 TEST_F(KsckTest, TestZeroTableCheck) {
-  ASSERT_OK(ksck_->CheckMasterRunning());
-  ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  ASSERT_OK(ksck_->CheckTabletServersRunning());
-  ASSERT_OK(ksck_->CheckTablesConsistency());
+  ASSERT_OK(RunKsck());
 }
 
 TEST_F(KsckTest, TestOneTableCheck) {
   CreateOneTableOneTablet();
-  ASSERT_OK(ksck_->CheckMasterRunning());
-  ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  ASSERT_OK(ksck_->CheckTabletServersRunning());
-  ASSERT_OK(ksck_->CheckTablesConsistency());
+  ASSERT_OK(RunKsck());
 }
 
 TEST_F(KsckTest, TestOneSmallReplicatedTable) {
   CreateOneSmallReplicatedTable();
-  ASSERT_OK(ksck_->CheckMasterRunning());
-  ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  ASSERT_OK(ksck_->CheckTabletServersRunning());
-  ASSERT_OK(ksck_->CheckTablesConsistency());
+  ASSERT_OK(RunKsck());
 }
 
 TEST_F(KsckTest, TestOneOneTabletBrokenTable) {
   CreateOneOneTabletReplicatedBrokenTable();
-  ASSERT_OK(ksck_->CheckMasterRunning());
-  ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  ASSERT_OK(ksck_->CheckTabletServersRunning());
-  ASSERT_TRUE(ksck_->CheckTablesConsistency().IsCorruption());
+  Status s = RunKsck();
+  EXPECT_EQ("Corruption: 1 table(s) are bad", s.ToString());
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "Tablet tablet-id-1 of table 'test' has 2 instead of 3 replicas");
+}
+
+TEST_F(KsckTest, TestMismatchedAssignments) {
+  CreateOneSmallReplicatedTable();
+  shared_ptr<MockKsckTabletServer> ts = static_pointer_cast<MockKsckTabletServer>(
+      master_->tablet_servers_.at(Substitute("ts-id-$0", 0)));
+  ts->tablet_status_map_.erase(ts->tablet_status_map_.begin()->first);
+
+  Status s = RunKsck();
+  EXPECT_EQ("Corruption: 1 table(s) are bad", s.ToString());
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "WARNING: Detected problems with Tablet tablet-id-2 of table 'test'\n"
+                      "------------------------------------------------------------\n"
+                      "WARNING: Missing a tablet replica on tablet server ts-id-0 (<mock>)\n");
+}
+
+TEST_F(KsckTest, TestTabletNotRunning) {
+  CreateOneSmallReplicatedTableWithTabletNotRunning();
+
+  Status s = RunKsck();
+  EXPECT_EQ("Corruption: 1 table(s) are bad", s.ToString());
+  ASSERT_STR_CONTAINS(
+      err_stream_.str(),
+      "WARNING: Detected problems with Tablet tablet-id-0 of table 'test'\n"
+      "------------------------------------------------------------\n"
+      "WARNING: Bad state on TS ts-id-0 (<mock>): FAILED\n"
+      "  Last status: \n"
+      "  Data state:  TABLET_DATA_UNKNOWN\n"
+      "WARNING: Bad state on TS ts-id-1 (<mock>): FAILED\n"
+      "  Last status: \n"
+      "  Data state:  TABLET_DATA_UNKNOWN\n"
+      "WARNING: Bad state on TS ts-id-2 (<mock>): FAILED\n"
+      "  Last status: \n"
+      "  Data state:  TABLET_DATA_UNKNOWN\n"
+      "ERROR: Tablet tablet-id-0 of table 'test' does not have a majority of "
+      "replicas in RUNNING state\n");
 }
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 481141c..cb95f3a 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -22,6 +22,7 @@
 #include <mutex>
 #include <unordered_set>
 
+#include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
@@ -52,22 +53,26 @@ DEFINE_uint64(checksum_snapshot_timestamp, ChecksumOptions::kCurrentTimestamp,
               "timestamp to use for snapshot checksum scans, defaults to 0, which "
               "uses the current timestamp of a tablet server involved in the scan");
 
+// The stream to write output to. If this is NULL, defaults to cerr.
+// This is used by tests to capture output.
+ostream* g_err_stream = NULL;
+
 // Print an informational message to cerr.
+static ostream& Out() {
+  return (g_err_stream ? *g_err_stream : cerr);
+}
 static ostream& Info() {
-  cerr << "INFO: ";
-  return cerr;
+  return Out() << "INFO: ";
 }
 
 // Print a warning message to cerr.
 static ostream& Warn() {
-  cerr << "WARNING: ";
-  return cerr;
+  return Out() << "WARNING: ";
 }
 
 // Print an error message to cerr.
 static ostream& Error() {
-  cerr << "ERROR: ";
-  return cerr;
+  return Out() << "ERROR: ";
 }
 
 ChecksumOptions::ChecksumOptions()
@@ -86,6 +91,14 @@ ChecksumOptions::ChecksumOptions(MonoDelta timeout, int scan_concurrency,
 
 const uint64_t ChecksumOptions::kCurrentTimestamp = 0;
 
+tablet::TabletStatePB KsckTabletServer::ReplicaState(const std::string& tablet_id) const {
+  CHECK_EQ(state_, kFetched);
+  if (!ContainsKey(tablet_status_map_, tablet_id)) {
+    return tablet::UNKNOWN;
+  }
+  return tablet_status_map_.at(tablet_id).state();
+}
+
 KsckCluster::~KsckCluster() {
 }
 
@@ -126,7 +139,7 @@ Status Ksck::FetchTableAndTabletInfo() {
   return cluster_->FetchTableAndTabletInfo();
 }
 
-Status Ksck::CheckTabletServersRunning() {
+Status Ksck::FetchInfoFromTabletServers() {
   VLOG(1) << "Getting the Tablet Servers list";
   int servers_count = cluster_->tablet_servers().size();
   VLOG(1) << Substitute("List of $0 Tablet Servers retrieved", servers_count);
@@ -136,7 +149,7 @@ Status Ksck::CheckTabletServersRunning() {
   }
 
   int bad_servers = 0;
-  VLOG(1) << "Connecting to all the Tablet Servers";
+  VLOG(1) << "Fetching info from all the Tablet Servers";
   for (const KsckMaster::TSMap::value_type& entry : cluster_->tablet_servers()) {
     Status s = ConnectToTabletServer(entry.second);
     if (!s.ok()) {
@@ -144,10 +157,10 @@ Status Ksck::CheckTabletServersRunning() {
     }
   }
   if (bad_servers == 0) {
-    Info() << Substitute("Connected to all $0 Tablet Servers", servers_count) << endl;
+    Info() << Substitute("Fetched info from all $0 Tablet Servers", servers_count) << endl;
     return Status::OK();
   } else {
-    Warn() << Substitute("Connected to $0 Tablet Servers, $1 weren't reachable",
+    Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable",
                          servers_count - bad_servers, bad_servers) << endl;
     return Status::NetworkError("Not all Tablet Servers are reachable");
   }
@@ -155,12 +168,12 @@ Status Ksck::CheckTabletServersRunning() {
 
 Status Ksck::ConnectToTabletServer(const shared_ptr<KsckTabletServer>& ts) {
   VLOG(1) << "Going to connect to Tablet Server: " << ts->uuid();
-  Status s = ts->Connect();
+  Status s = ts->FetchInfo();
   if (s.ok()) {
     VLOG(1) << "Connected to Tablet Server: " << ts->uuid();
   } else {
-    Warn() << Substitute("Unable to connect to Tablet Server $0 ($1) because $2",
-                         ts->uuid(), ts->address(), s.ToString()) << endl;
+    Warn() << Substitute("Unable to connect to Tablet Server $0: $1",
+                         ts->ToString(), s.ToString()) << endl;
   }
   return s;
 }
@@ -168,7 +181,7 @@ Status Ksck::ConnectToTabletServer(const shared_ptr<KsckTabletServer>& ts) {
 Status Ksck::CheckTablesConsistency() {
   VLOG(1) << "Getting the tables list";
   int tables_count = cluster_->tables().size();
-  VLOG(1) << Substitute("List of $0 tables retrieved", tables_count);
+  VLOG(1) << Substitute("List of $0 table(s) retrieved", tables_count);
 
   if (tables_count == 0) {
     Info() << "The cluster doesn't have any tables" << endl;
@@ -183,12 +196,12 @@ Status Ksck::CheckTablesConsistency() {
     }
   }
   if (bad_tables_count == 0) {
-    Info() << Substitute("The metadata for $0 tables is HEALTHY", tables_count) << endl;
+    Info() << Substitute("The metadata for $0 table(s) is HEALTHY", tables_count) << endl;
     return Status::OK();
   } else {
-    Warn() << Substitute("$0 out of $1 tables are not in a healthy state",
+    Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state",
                          bad_tables_count, tables_count) << endl;
-    return Status::Corruption(Substitute("$0 tables are bad", bad_tables_count));
+    return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count));
   }
 }
 
@@ -335,8 +348,18 @@ Status Ksck::ChecksumData(const vector<string>& tables,
   }
 
   if (options.use_snapshot && options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
-    // Set the snapshot timestamp to the current timestamp of an arbitrary tablet server.
-    tablet_server_queues.begin()->first->CurrentTimestamp(&options.snapshot_timestamp);
+    // Set the snapshot timestamp to the current timestamp of the first healthy tablet server
+    // we can find.
+    for (const auto& ts : tablet_server_queues) {
+      if (ts.first->is_healthy()) {
+        options.snapshot_timestamp = ts.first->current_timestamp();
+        break;
+      }
+    }
+    if (options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
+      return Status::ServiceUnavailable(
+          "No tablet servers were available to fetch the current timestamp");
+    }
     Info() << "Using snapshot timestamp: " << options.snapshot_timestamp << endl;
   }
 
@@ -462,18 +485,58 @@ bool Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int table_num_repl
   string tablet_str = Substitute("Tablet $0 of table '$1'",
                                  tablet->id(), tablet->table()->name());
   vector<shared_ptr<KsckTabletReplica> > replicas = tablet->replicas();
-  bool good_tablet = true;
-  if (replicas.size() != table_num_replicas) {
-    Warn() << Substitute("$0 has $1 instead of $2 replicas",
-                         tablet_str, replicas.size(), table_num_replicas) << endl;
-    // We only fail the "goodness" check if the tablet is under-replicated.
-    if (replicas.size() < table_num_replicas) {
-      good_tablet = false;
-    }
+  vector<string> warnings, errors;
+  if (check_replica_count_ && replicas.size() != table_num_replicas) {
+    warnings.push_back(Substitute("$0 has $1 instead of $2 replicas",
+                                  tablet_str, replicas.size(), table_num_replicas));
   }
   int leaders_count = 0;
   int followers_count = 0;
+  int alive_count = 0;
+  int running_count = 0;
   for (const shared_ptr<KsckTabletReplica> replica : replicas) {
+    VLOG(1) << Substitute("A replica of tablet $0 is on live tablet server $1",
+                          tablet->id(), replica->ts_uuid());
+    // Check for agreement on tablet assignment and state between the master
+    // and the tablet server.
+    auto ts = FindPtrOrNull(cluster_->tablet_servers(), replica->ts_uuid());
+    if (ts && ts->is_healthy()) {
+      alive_count++;
+      auto state = ts->ReplicaState(tablet->id());
+      if (state != tablet::UNKNOWN) {
+        VLOG(1) << Substitute("Tablet server $0 agrees that it hosts a replica of $1",
+                              ts->ToString(), tablet_str);
+      }
+
+      switch (state) {
+        case tablet::RUNNING:
+          VLOG(1) << Substitute("Tablet replica for $0 on TS $1 is RUNNING",
+                                tablet_str, ts->ToString());
+          running_count++;
+          break;
+
+        case tablet::UNKNOWN:
+          warnings.push_back(Substitute("Missing a tablet replica on tablet server $0",
+                                        ts->ToString()));
+          break;
+
+        default: {
+          const auto& status_pb = ts->tablet_status_map().at(tablet->id());
+          warnings.push_back(
+              Substitute("Bad state on TS $0: $1\n"
+                         "  Last status: $2\n"
+                         "  Data state:  $3",
+                         ts->ToString(), tablet::TabletStatePB_Name(state),
+                         status_pb.last_status(),
+                         tablet::TabletDataState_Name(status_pb.tablet_data_state())));
+          break;
+        }
+      }
+    } else {
+      // no TS or unhealthy TS
+      warnings.push_back(Substitute("Should have a replica on TS $0, but TS is unavailable",
+                                    ts ? ts->ToString() : replica->ts_uuid()));
+    }
     if (replica->is_leader()) {
       VLOG(1) << Substitute("Replica at $0 is a LEADER", replica->ts_uuid());
       leaders_count++;
@@ -483,17 +546,33 @@ bool Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int table_num_repl
     }
   }
   if (leaders_count == 0) {
-    Warn() << Substitute("$0 doesn't have a leader", tablet_str) << endl;
-    good_tablet = false;
+    errors.push_back("No leader detected");
   }
   VLOG(1) << Substitute("$0 has $1 leader and $2 followers",
                         tablet_str, leaders_count, followers_count);
-  return good_tablet;
-}
+  int majority_size = consensus::MajoritySize(table_num_replicas);
+  if (alive_count < majority_size) {
+    errors.push_back(Substitute("$0 does not have a majority of replicas on live tablet servers",
+                                tablet_str));
+  } else if (running_count < majority_size) {
+    errors.push_back(Substitute("$0 does not have a majority of replicas in RUNNING state",
+                                tablet_str));
+  }
+
+  bool has_issues = !warnings.empty() || !errors.empty();
+  if (has_issues) {
+    Warn() << "Detected problems with " << tablet_str << endl
+           << "------------------------------------------------------------" << endl;
+    for (const auto& s : warnings) {
+      Warn() << s << endl;
+    }
+    for (const auto& s : errors) {
+      Error() << s << endl;
+    }
+    Out() << endl;
+  }
 
-Status Ksck::CheckAssignments() {
-  // TODO
-  return Status::NotSupported("CheckAssignments hasn't been implemented");
+  return !has_issues;
 }
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 92e1951..502f68e 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -20,6 +20,7 @@
 #ifndef KUDU_TOOLS_KSCK_H
 #define KUDU_TOOLS_KSCK_H
 
+#include <gtest/gtest_prod.h>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -27,6 +28,7 @@
 #include <vector>
 
 #include "kudu/common/schema.h"
+#include "kudu/tablet/tablet.pb.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/status.h"
@@ -70,6 +72,7 @@ class KsckTabletReplica {
   KsckTabletReplica(const std::string ts_uuid, const bool is_leader, const bool is_follower)
       : is_leader_(is_leader),
         is_follower_(is_follower),
+        is_running_(false),
         ts_uuid_(ts_uuid) {
   }
 
@@ -88,6 +91,7 @@ class KsckTabletReplica {
  private:
   const bool is_leader_;
   const bool is_follower_;
+  bool is_running_;
   const std::string ts_uuid_;
   DISALLOW_COPY_AND_ASSIGN(KsckTabletReplica);
 };
@@ -168,13 +172,14 @@ typedef Callback<void(const Status& status, uint64_t checksum)> ReportResultCall
 // Class that must be extended to represent a tablet server.
 class KsckTabletServer {
  public:
+  // Map from tablet id to tablet replicas.
+  typedef std::unordered_map<std::string, tablet::TabletStatusPB > TabletStatusMap;
+
   explicit KsckTabletServer(std::string uuid) : uuid_(std::move(uuid)) {}
   virtual ~KsckTabletServer() { }
 
-  // Connects to the configured Tablet Server.
-  virtual Status Connect() const = 0;
-
-  virtual Status CurrentTimestamp(uint64_t* timestamp) const = 0;
+  // Connects to the configured tablet server and populates the fields of this class.
+  virtual Status FetchInfo() = 0;
 
   // Executes a checksum scan on the associated tablet, and runs the callback
   // with the result. The callback must be threadsafe and non-blocking.
@@ -188,10 +193,46 @@ class KsckTabletServer {
     return uuid_;
   }
 
+  std::string ToString() const {
+    return strings::Substitute("$0 ($1)", uuid(), address());
+  }
+
   virtual const std::string& address() const = 0;
 
+  bool is_healthy() const {
+    CHECK_NE(state_, kUninitialized);
+    return state_ == kFetched;
+  }
+
+  // Gets the mapping of tablet id to tablet replica for this tablet server.
+  const TabletStatusMap& tablet_status_map() const {
+    CHECK_EQ(state_, kFetched);
+    return tablet_status_map_;
+  }
+
+  tablet::TabletStatePB ReplicaState(const std::string& tablet_id) const;
+
+  uint64_t current_timestamp() const {
+    CHECK_EQ(state_, kFetched);
+    return timestamp_;
+  }
+
+ protected:
+  friend class KsckTest;
+  FRIEND_TEST(KsckTest, TestMismatchedAssignments);
+
+  enum State {
+    kUninitialized,
+    kFetchFailed,
+    kFetched
+  };
+  State state_ = kUninitialized;
+  TabletStatusMap tablet_status_map_;
+  uint64_t timestamp_;
+
  private:
   const std::string uuid_;
+
   DISALLOW_COPY_AND_ASSIGN(KsckTabletServer);
 };
 
@@ -240,8 +281,7 @@ class KsckCluster {
     return master_;
   }
 
-  const std::unordered_map<std::string,
-                           std::shared_ptr<KsckTabletServer> >& tablet_servers() {
+  const KsckMaster::TSMap& tablet_servers() {
     return tablet_servers_;
   }
 
@@ -260,7 +300,7 @@ class KsckCluster {
   Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table);
 
   const std::shared_ptr<KsckMaster> master_;
-  std::unordered_map<std::string, std::shared_ptr<KsckTabletServer> > tablet_servers_;
+  KsckMaster::TSMap tablet_servers_;
   std::vector<std::shared_ptr<KsckTable> > tables_;
   DISALLOW_COPY_AND_ASSIGN(KsckCluster);
 };
@@ -272,15 +312,22 @@ class Ksck {
       : cluster_(std::move(cluster)) {}
   ~Ksck() {}
 
-  // Verifies that it can connect to the Master.
+  // Set whether ksck should verify that each of the tablet's raft configurations
+  // has the same number of replicas that is specified by the tablet metadata.
+  // (default: true)
+  void set_check_replica_count(bool check) {
+    check_replica_count_ = check;
+  }
+
+  // Verifies that it can connect to the master.
   Status CheckMasterRunning();
 
-  // Populates all the cluster table and tablet info from the Master.
+  // Populates all the cluster table and tablet info from the master.
   Status FetchTableAndTabletInfo();
 
-  // Verifies that it can connect to all the Tablet Servers reported by the master.
-  // Must first call FetchTableAndTabletInfo().
-  Status CheckTabletServersRunning();
+  // Connects to all tablet servers, checks that they are alive, and fetches
+  // their current status and tablet information.
+  Status FetchInfoFromTabletServers();
 
   // Establishes a connection with the specified Tablet Server.
   // Must first call FetchTableAndTabletInfo().
@@ -288,7 +335,8 @@ class Ksck {
 
   // Verifies that all the tables have contiguous tablets and that each tablet has enough replicas
   // and a leader.
-  // Must first call FetchTableAndTabletInfo().
+  // Must first call FetchTableAndTabletInfo() and, if doing checks againt tablet
+  // servers (the default), must first call FetchInfoFromTabletServers().
   Status CheckTablesConsistency();
 
   // Verifies data checksums on all tablets by doing a scan of the database on each replica.
@@ -301,11 +349,6 @@ class Ksck {
                       const std::vector<std::string>& tablets,
                       const ChecksumOptions& options);
 
-  // Verifies that the assignments reported by the master are the same reported by the
-  // Tablet Servers.
-  // Must first call FetchTableAndTabletInfo().
-  Status CheckAssignments();
-
  private:
   bool VerifyTable(const std::shared_ptr<KsckTable>& table);
   bool VerifyTableWithTimeout(const std::shared_ptr<KsckTable>& table,
@@ -314,6 +357,8 @@ class Ksck {
   bool VerifyTablet(const std::shared_ptr<KsckTablet>& tablet, int table_num_replicas);
 
   const std::shared_ptr<KsckCluster> cluster_;
+
+  bool check_replica_count_ = true;
   DISALLOW_COPY_AND_ASSIGN(Ksck);
 };
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index 7305005..c10fabb 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -184,10 +184,9 @@ TEST_F(RemoteKsckTest, TestMasterOk) {
 }
 
 TEST_F(RemoteKsckTest, TestTabletServersOk) {
-  LOG(INFO) << "Fetching table and tablet info...";
+  ASSERT_OK(ksck_->CheckMasterRunning());
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
-  LOG(INFO) << "Checking tablet servers are running...";
-  ASSERT_OK(ksck_->CheckTabletServersRunning());
+  ASSERT_OK(ksck_->FetchInfoFromTabletServers());
 }
 
 TEST_F(RemoteKsckTest, TestTableConsistency) {
@@ -195,7 +194,9 @@ TEST_F(RemoteKsckTest, TestTableConsistency) {
   deadline.AddDelta(MonoDelta::FromSeconds(30));
   Status s;
   while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
+    ASSERT_OK(ksck_->CheckMasterRunning());
     ASSERT_OK(ksck_->FetchTableAndTabletInfo());
+    ASSERT_OK(ksck_->FetchInfoFromTabletServers());
     s = ksck_->CheckTablesConsistency();
     if (s.ok()) {
       break;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index fcfae31..325865a 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -45,22 +45,44 @@ MonoDelta GetDefaultTimeout() {
   return MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
 }
 
-Status RemoteKsckTabletServer::Connect() const {
-  tserver::PingRequestPB req;
-  tserver::PingResponsePB resp;
-  RpcController rpc;
-  rpc.set_timeout(GetDefaultTimeout());
-  return ts_proxy_->Ping(req, &resp, &rpc);
-}
+Status RemoteKsckTabletServer::FetchInfo() {
+  state_ = kFetchFailed;
+
+  {
+    tserver::PingRequestPB req;
+    tserver::PingResponsePB resp;
+    RpcController rpc;
+    rpc.set_timeout(GetDefaultTimeout());
+    RETURN_NOT_OK_PREPEND(ts_proxy_->Ping(req, &resp, &rpc),
+                          "could not send Ping RPC to server");
+  }
 
-Status RemoteKsckTabletServer::CurrentTimestamp(uint64_t* timestamp) const {
-  server::ServerClockRequestPB req;
-  server::ServerClockResponsePB resp;
-  RpcController rpc;
-  rpc.set_timeout(GetDefaultTimeout());
-  RETURN_NOT_OK(generic_proxy_->ServerClock(req, &resp, &rpc));
-  CHECK(resp.has_timestamp());
-  *timestamp = resp.timestamp();
+  {
+    tserver::ListTabletsRequestPB req;
+    tserver::ListTabletsResponsePB resp;
+    RpcController rpc;
+    rpc.set_timeout(GetDefaultTimeout());
+    req.set_need_schema_info(false);
+    RETURN_NOT_OK_PREPEND(ts_proxy_->ListTablets(req, &resp, &rpc),
+                          "could not list tablets");
+    tablet_status_map_.clear();
+    for (auto& status : *resp.mutable_status_and_schema()) {
+      tablet_status_map_[status.tablet_status().tablet_id()].Swap(status.mutable_tablet_status());
+    }
+  }
+
+  {
+    server::ServerClockRequestPB req;
+    server::ServerClockResponsePB resp;
+    RpcController rpc;
+    rpc.set_timeout(GetDefaultTimeout());
+    RETURN_NOT_OK_PREPEND(generic_proxy_->ServerClock(req, &resp, &rpc),
+                          "could not fetch timestamp");
+    CHECK(resp.has_timestamp());
+    timestamp_ = resp.timestamp();
+  }
+
+  state_ = kFetched;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/ksck_remote.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 5835b25..5ca2a3b 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -51,9 +51,7 @@ class RemoteKsckTabletServer : public KsckTabletServer {
         ts_proxy_(new tserver::TabletServerServiceProxy(messenger, address)) {
   }
 
-  virtual Status Connect() const OVERRIDE;
-
-  virtual Status CurrentTimestamp(uint64_t* timestamp) const OVERRIDE;
+  virtual Status FetchInfo() OVERRIDE;
 
   virtual void RunTabletChecksumScanAsync(
       const std::string& tablet_id,
@@ -71,6 +69,7 @@ class RemoteKsckTabletServer : public KsckTabletServer {
   const std::shared_ptr<rpc::Messenger> messenger_;
   const std::shared_ptr<server::GenericServiceProxy> generic_proxy_;
   const std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy_;
+
 };
 
 // This implementation connects to a Master via RPC.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tools/kudu-ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-ksck.cc b/src/kudu/tools/kudu-ksck.cc
index dd98049..e704f2e 100644
--- a/src/kudu/tools/kudu-ksck.cc
+++ b/src/kudu/tools/kudu-ksck.cc
@@ -100,8 +100,8 @@ static void RunKsck(vector<string>* error_messages) {
                       "Error fetching the cluster metadata from the Master server");
   if (!error_messages->empty()) return;
 
-  PUSH_PREPEND_NOT_OK(ksck->CheckTabletServersRunning(), error_messages,
-                      "Tablet server aliveness check error");
+  PUSH_PREPEND_NOT_OK(ksck->FetchInfoFromTabletServers(), error_messages,
+                      "Error fetching info from tablet servers");
 
   // TODO: Add support for tables / tablets filter in the consistency check.
   PUSH_PREPEND_NOT_OK(ksck->CheckTablesConsistency(), error_messages,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 85e24a2..5a9c04b 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1106,9 +1106,12 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
   for (const scoped_refptr<TabletPeer>& peer : peers) {
     StatusAndSchemaPB* status = peer_status->Add();
     peer->GetTabletStatusPB(status->mutable_tablet_status());
-    CHECK_OK(SchemaToPB(peer->status_listener()->schema(),
-                        status->mutable_schema()));
-    peer->tablet_metadata()->partition_schema().ToPB(status->mutable_partition_schema());
+
+    if (req->need_schema_info()) {
+      CHECK_OK(SchemaToPB(peer->status_listener()->schema(),
+                          status->mutable_schema()));
+      peer->tablet_metadata()->partition_schema().ToPB(status->mutable_partition_schema());
+    }
   }
   context->RespondSuccess();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/513d6e9f/src/kudu/tserver/tserver.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 9df226a..e44c1bc 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -156,6 +156,10 @@ message WriteResponsePB {
 
 // A list tablets request
 message ListTabletsRequestPB {
+  // Whether the server should include schema information in the response.
+  // These fields can be relatively large, so not including it can make this call
+  // less heavy-weight.
+  optional bool need_schema_info = 1 [default = true];
 }
 
 // A list tablets response
@@ -164,7 +168,10 @@ message ListTabletsResponsePB {
 
   message StatusAndSchemaPB {
     required tablet.TabletStatusPB tablet_status = 1;
-    required SchemaPB schema = 2;
+
+    // 'schema' and 'partition_schema' will only be included if the original request
+    // set 'need_schema_info'.
+    optional SchemaPB schema = 2;
     optional PartitionSchemaPB partition_schema = 3;
   }