You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2018/10/09 00:36:33 UTC

[1/2] kudu git commit: [tools] KUDU-2179: Have ksck not use a single snapshot for all tablets

Repository: kudu
Updated Branches:
  refs/heads/master 8d6cfe10d -> 4de4347a3


[tools] KUDU-2179: Have ksck not use a single snapshot for all tablets

ksck checksum scans allow the user to checksum with snapshot scans, so
that a checksum can be done even as tablets are mutated. It also allows
users to omit a snapshot timestamp. Previously, in this case, the
snapshot timestamp would be retrieved from some healthy tablet server at
the beginning of the checksum process, and used for every replica. This
didn't work well for checksumming large tables, because eventually the
snapshot timestamp fell before the ancient history mark, and subsequent
checksums scans would not be accepted by the tablet servers.

This changes how checksum scans work to address this problem:
1. A background process periodically updates timestamps from tablet
   servers.
2. The checksum process is reorganized so the replicas of one tablet
   are checksummed together.
3. When a tablet is about to be checksummed, and the checksum scan is a
   snapshot scan with no user-provided timestamp, the tablet is assigned
   an up-to-date timestamp from one of the tablet servers that hosts a
   replica. Every replica is then checksummed using this snapshot
   timestamp.
4. The original default timeout of 3600 seconds for a checksum scan is
   too low, but it didn't really matter because the default tablet
   history max age was 900 seconds. Now that checksum scans can continue
   for many hours, the default timeout is raised to 86400 seconds (1
   day), and a new idle timeout is added. If a checksum process does not
   checksum an additional row for this idle timeout (default 10
   minutes), it will idle time out.

Note that there is a new scheduling problem given #2: each tablet server
has a fixed number of slots for checksum scans, but every tablet server
hosting a replica must have a slot available before any replica's
checksum can start, so deciding in which order to checksum tablets and
how to find which are available to schedule is important. Given that the
bulk of the time in checksums is occupied waiting for tablet servers to
read lots of data off disk, materialize it as rows, and checksum it,
it's worth spending a lot of effort to make sure the cluster is fully
utilized given the scan concurrency constraints. So, the tool uses a
brute force approach and simply checks all tablets to see which can be
checksummed, any time a replica checksum finishes and frees a slot.
Tablets are considered in tablet id order. Since tablet ids are UUIDs,
there should be no correlation between a tablet's id and how its
replicas are distributed across tablet servers.

There are several tests added:
1. For the KUDU-2179 fix itself.
2. For the idle timeout.
3. For when a checksum finds mismatches. Yes, we didn't have a test for
   this before. After adding this test I saw that the output is a little
   confusing since it reported the number of replicas with mismatches
   rather than the number of tablets, so I altered the output to fix
   that.
4. A couple of tests exercising situations when all tablet servers are
   unavailable and when all peers of a tablet are unavailable.

I also checksummed a very large cluster with 500TB of data or so, across
about 37000 replicas. The checksum scan completed successfully after
more than 12 hours.

Change-Id: Iff0905c2099e6f56ed1cb651611918acbaf75476
Reviewed-on: http://gerrit.cloudera.org:8080/11554
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <ab...@apache.org>
Reviewed-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0d4740b7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0d4740b7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0d4740b7

Branch: refs/heads/master
Commit: 0d4740b7692c0cb5c39cca73276b395921bd3ae4
Parents: 8d6cfe1
Author: Will Berkeley <wd...@gmail.org>
Authored: Tue Sep 25 10:26:15 2018 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Mon Oct 8 22:23:04 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck-test.cc        | 106 +++++-
 src/kudu/tools/ksck.h              |  21 +-
 src/kudu/tools/ksck_checksum.cc    | 548 ++++++++++++++++++++++++--------
 src/kudu/tools/ksck_checksum.h     | 253 ++++++++++-----
 src/kudu/tools/ksck_remote-test.cc |  89 +++++-
 src/kudu/tools/ksck_remote.cc      |  77 +++--
 src/kudu/tools/ksck_remote.h       |  39 ++-
 7 files changed, 863 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index a07554e..558eff1 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -18,6 +18,7 @@
 #include "kudu/tools/ksck.h"
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <map>
 #include <memory>
@@ -38,7 +39,6 @@
 #include "kudu/common/schema.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/map-util.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/tablet/metadata.pb.h"
@@ -52,6 +52,8 @@
 #include "kudu/util/test_util.h"
 
 DECLARE_bool(checksum_scan);
+DECLARE_int32(checksum_idle_timeout_sec);
+DECLARE_int32(max_progress_report_wait_ms);
 DECLARE_string(color);
 DECLARE_string(ksck_format);
 DECLARE_uint32(truncate_server_csv_length);
@@ -142,22 +144,35 @@ class MockKsckTabletServer : public KsckTabletServer {
     return Status::OK();
   }
 
-  virtual void RunTabletChecksumScanAsync(
-      const std::string& /*tablet_id*/,
+  void FetchCurrentTimestampAsync() override {}
+
+  Status FetchCurrentTimestamp() override {
+    return Status::OK();
+  }
+
+  void RunTabletChecksumScanAsync(
+      const std::string& tablet_id,
       const Schema& /*schema*/,
       const KsckChecksumOptions& /*options*/,
-      KsckChecksumProgressCallbacks* callbacks) OVERRIDE {
-    callbacks->Progress(10, 20);
-    callbacks->Finished(Status::OK(), 0);
+      shared_ptr<KsckChecksumManager> manager) override {
+    manager->ReportProgress(checksum_progress_, 2 * checksum_progress_);
+    if (checksum_progress_ > 0) {
+      manager->ReportResult(tablet_id, uuid_, Status::OK(), checksum_);
+    }
   }
 
-  virtual std::string address() const OVERRIDE {
+  std::string address() const override {
     return address_;
   }
 
   // Public because the unit tests mutate these variables directly.
   Status fetch_info_status_;
   KsckServerHealth fetch_info_health_;
+  // The fake checksum for replicas on this mock server.
+  uint64_t checksum_ = 0;
+  // The fake progress amount for this mock server, used to mock checksum
+  // progress for this server.
+  int64_t checksum_progress_ = 10;
   using KsckTabletServer::flags_;
   using KsckTabletServer::version_;
 
@@ -1504,5 +1519,82 @@ TEST_F(KsckTest, TestChecksumScanJson) {
   ASSERT_OK(r.Init());
 }
 
+TEST_F(KsckTest, TestChecksumScanMismatch) {
+  CreateOneSmallReplicatedTable();
+  FLAGS_checksum_scan = true;
+
+  // Set one tablet server to return a non-zero checksum for its replicas.
+  // This will not match the checksums of replicas from other servers because
+  // they are zero by default.
+  auto ts = static_pointer_cast<MockKsckTabletServer>(
+      cluster_->tablet_servers_.begin()->second);
+  ts->checksum_ = 1;
+
+  ASSERT_TRUE(RunKsck().IsRuntimeError());
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "Corruption: checksum scan error: 3 tablet(s) had "
+                      "checksum mismatches");
+}
+
+TEST_F(KsckTest, TestChecksumScanIdleTimeout) {
+  CreateOneTableOneTablet();
+  FLAGS_checksum_scan = true;
+
+  // Set an impossibly low idle timeout and tweak one of the servers to always
+  // report no progress on the checksum.
+  FLAGS_checksum_idle_timeout_sec = 0;
+  auto ts = static_pointer_cast<MockKsckTabletServer>(
+      cluster_->tablet_servers_.begin()->second);
+  ts->checksum_progress_ = 0;
+
+  // Make the progress report happen frequently so this test is fast.
+  FLAGS_max_progress_report_wait_ms = 10;
+  ASSERT_TRUE(RunKsck().IsRuntimeError());
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "Timed out: checksum scan error: Checksum scan did not "
+                      "make progress within the idle timeout of 0.000s");
+}
+
+TEST_F(KsckTest, TestChecksumWithAllUnhealthyTabletServers) {
+  CreateOneTableOneTablet();
+  FLAGS_checksum_scan = true;
+
+  // Make all tablet servers unhealthy.
+  for (const auto& entry : cluster_->tablet_servers_) {
+    auto ts = static_pointer_cast<MockKsckTabletServer>(entry.second);
+    ts->fetch_info_status_ = Status::NetworkError("gremlins");
+    ts->fetch_info_health_ = KsckServerHealth::UNAVAILABLE;
+  }
+
+  // The checksum should short-circuit and fail because no tablet servers are
+  // available.
+  ASSERT_TRUE(RunKsck().IsRuntimeError());
+  ASSERT_STR_CONTAINS(err_stream_.str(), "no tablet servers are available");
+}
+
+TEST_F(KsckTest, TestChecksumWithAllPeersUnhealthy) {
+  CreateOneTableOneTablet();
+  FLAGS_checksum_scan = true;
+
+  // Make all tablet servers unhealthy except an extra one with no replica of
+  // the tablet.
+  for (const auto& entry : cluster_->tablet_servers_) {
+    auto ts = static_pointer_cast<MockKsckTabletServer>(entry.second);
+    ts->fetch_info_status_ = Status::NetworkError("gremlins");
+    ts->fetch_info_health_ = KsckServerHealth::UNAVAILABLE;
+  }
+  const char* const new_uuid = "new";
+  EmplaceOrDie(&cluster_->tablet_servers_,
+               new_uuid,
+               std::make_shared<MockKsckTabletServer>(new_uuid));
+
+  // The checksum should fail for tablet because none of its replicas are
+  // available to provide a timestamp.
+  ASSERT_TRUE(RunKsck().IsRuntimeError());
+  ASSERT_STR_CONTAINS(
+      err_stream_.str(),
+      "T tablet-id-1 P ts-id-0 (<mock>): Error: Aborted: "
+      "no healthy peer was available to provide a timestamp");
+}
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index c654be0..9a21736 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -20,6 +20,7 @@
 #ifndef KUDU_TOOLS_KSCK_H
 #define KUDU_TOOLS_KSCK_H
 
+#include <atomic>
 #include <cstdint>
 #include <iosfwd>
 #include <map>
@@ -37,6 +38,7 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.pb.h"  // IWYU pragma: keep
@@ -49,7 +51,7 @@ class MonoDelta;
 
 namespace tools {
 
-class KsckChecksumProgressCallbacks;
+class KsckChecksumManager;
 class KsckTable;
 struct KsckChecksumOptions;
 
@@ -293,13 +295,16 @@ class KsckTabletServer {
   // "Unusual" flags ares ones tagged hidden, experimental, or unsafe.
   virtual Status FetchUnusualFlags() = 0;
 
-  // Executes a checksum scan on the associated tablet, and runs the callback
-  // with the result. The callback must be threadsafe and non-blocking.
+  // Fetches and updates the current timestamp from the tablet server.
+  virtual void FetchCurrentTimestampAsync() = 0;
+  virtual Status FetchCurrentTimestamp() = 0;
+
+  // Executes a checksum scan on a tablet and reports the result to 'manager'.
   virtual void RunTabletChecksumScanAsync(
                   const std::string& tablet_id,
                   const Schema& schema,
                   const KsckChecksumOptions& options,
-                  KsckChecksumProgressCallbacks* callbacks) = 0;
+                  std::shared_ptr<KsckChecksumManager> manager) = 0;
 
   virtual const std::string& uuid() const {
     return uuid_;
@@ -370,7 +375,7 @@ class KsckTabletServer {
 
   // May be none if flag fetch fails.
   boost::optional<server::GetFlagsResponsePB> flags_;
-  uint64_t timestamp_;
+  std::atomic<uint64_t> timestamp_;
   const std::string uuid_;
 
  private:
@@ -424,6 +429,12 @@ class KsckCluster {
     return tables_;
   }
 
+  // Returns a reference to the messenger used by this instance.
+  // Returns nullptr if no messenger is used.
+  virtual std::shared_ptr<rpc::Messenger> messenger() const {
+    return nullptr;
+  }
+
  protected:
   KsckCluster() = default;
   MasterList masters_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck_checksum.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
index 70d7c47..f41efc7 100644
--- a/src/kudu/tools/ksck_checksum.cc
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -19,10 +19,13 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <functional>
 #include <iostream>
 #include <map>
+#include <set>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -30,13 +33,19 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
-#include "kudu/common/schema.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/periodic.h"
 #include "kudu/tools/ksck.h"
 #include "kudu/tools/tool_action_common.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 
 using std::endl;
 using std::ostream;
@@ -46,26 +55,84 @@ using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
-DEFINE_int32(checksum_timeout_sec, 3600,
+DEFINE_int32(checksum_idle_timeout_sec, 60 * 10,
+             "Maximum total seconds to wait without making any progress in a "
+             "checksum scan before timing out due to idleness.");
+DEFINE_int32(checksum_timeout_sec, 24 * 60 * 60,
              "Maximum total seconds to wait for a checksum scan to complete "
              "before timing out.");
 DEFINE_int32(checksum_scan_concurrency, 4,
              "Number of concurrent checksum scans to execute per tablet server.");
+DEFINE_int32(max_progress_report_wait_ms, 5000,
+             "Maximum number of milliseconds to wait between progress reports. "
+             "Used to speed up tests.");
+TAG_FLAG(max_progress_report_wait_ms, hidden);
+DEFINE_int32(timestamp_update_period_ms, 60000,
+             "Number of milliseconds to wait between updating the current "
+             "timestamps used for checksum scans. This would only need to be "
+             "changed if checksumming replicas on servers with a very low "
+             "value of -tablet_history_max_age_sec.");
+TAG_FLAG(timestamp_update_period_ms, advanced);
+DEFINE_int32(wait_before_setting_snapshot_timestamp_ms, 0,
+             "Number of milliseconds to wait before assigning a timestamp and "
+             "starting a checksum scan. For tests only.");
+TAG_FLAG(wait_before_setting_snapshot_timestamp_ms, hidden);
+TAG_FLAG(wait_before_setting_snapshot_timestamp_ms, unsafe);
 DEFINE_bool(checksum_snapshot, true, "Should the checksum scanner use a snapshot scan?");
 DEFINE_uint64(checksum_snapshot_timestamp,
               kudu::tools::KsckChecksumOptions::kCurrentTimestamp,
-              "Timestamp to use for snapshot checksum scans. Defaults to 0, which "
-              "uses the current timestamp of a tablet server involved in the scan.");
+              "Timestamp to use for snapshot checksum scans. Defaults to 0, "
+              "which will cause each tablet to use a recent timestamp chosen "
+              "when all the checksums for its replicas begin.");
 
 namespace kudu {
 namespace tools {
 
+namespace {
+const string LogPrefix(const string& tablet_id,
+                       const string& replica_uuid = "") {
+  if (replica_uuid.empty()) {
+    return Substitute("T $0: ", tablet_id);
+  }
+  return Substitute("T $0 P $1: ", tablet_id, replica_uuid);
+}
+
+// Count the replica in 'tablet_infos' and check that the every replica belongs
+// to a tablet server listed in 'tservers'.
+Status CountReplicasAndCheckTabletServersAreConsistent(
+    const TabletInfoMap& tablet_infos,
+    const TabletServerList& tservers,
+    int* num_replicas) {
+  CHECK(num_replicas);
+  *num_replicas = 0;
+  std::set<string> tserver_uuid_set;
+  for (const auto& tserver : tservers) {
+    InsertIfNotPresent(&tserver_uuid_set, tserver->uuid());
+  }
+  for (const auto& entry : tablet_infos) {
+    const auto& tablet = entry.second.tablet;
+    for (const auto& replica : tablet->replicas()) {
+      (*num_replicas)++;
+      if (!ContainsKey(tserver_uuid_set, replica->ts_uuid())) {
+        return Status::InvalidArgument(Substitute(
+            "tablet server $0 hosting a replica of tablet $1 not found in "
+            "list of tablet servers",
+            replica->ts_uuid(),
+            tablet->id()));
+      }
+    }
+  }
+  return Status::OK();
+}
+} // anonymous namespace
+
 KsckChecksumOptions::KsckChecksumOptions()
     : KsckChecksumOptions({}, {}) {}
 
 KsckChecksumOptions::KsckChecksumOptions(vector<string> table_filters,
                                          vector<string> tablet_id_filters)
     : KsckChecksumOptions(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec),
+                          MonoDelta::FromSeconds(FLAGS_checksum_idle_timeout_sec),
                           FLAGS_checksum_scan_concurrency,
                           FLAGS_checksum_snapshot,
                           FLAGS_checksum_snapshot_timestamp,
@@ -73,10 +140,12 @@ KsckChecksumOptions::KsckChecksumOptions(vector<string> table_filters,
                           std::move(tablet_id_filters)) {}
 
 KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
+                                         MonoDelta idle_timeout,
                                          int scan_concurrency,
                                          bool use_snapshot,
                                          uint64_t snapshot_timestamp)
     : KsckChecksumOptions(timeout,
+                          idle_timeout,
                           scan_concurrency,
                           use_snapshot,
                           snapshot_timestamp,
@@ -84,60 +153,173 @@ KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
                           {}) {}
 
 KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
+                                         MonoDelta idle_timeout,
                                          int scan_concurrency,
                                          bool use_snapshot,
                                          uint64_t snapshot_timestamp,
                                          vector<string> table_filters,
                                          vector<string> tablet_id_filters)
     : timeout(timeout),
+      idle_timeout(idle_timeout),
       scan_concurrency(scan_concurrency),
       use_snapshot(use_snapshot),
       snapshot_timestamp(snapshot_timestamp),
       table_filters(std::move(table_filters)),
       tablet_id_filters(std::move(tablet_id_filters)) {}
 
-ChecksumResultReporter::ChecksumResultReporter(int num_tablet_replicas)
-    : expected_count_(num_tablet_replicas),
-      responses_(num_tablet_replicas),
+void KsckChecksumManager::InitializeTsSlotsMap() {
+  for (const auto& tserver : tservers_) {
+    EmplaceIfNotPresent(&ts_slots_open_map_,
+                        tserver->uuid(),
+                        opts_.scan_concurrency);
+  }
+}
+
+void KsckChecksumManager::ReleaseTsSlotsUnlocked(const vector<string>& ts_uuids) {
+  for (const auto& uuid : ts_uuids) {
+    auto& slots_open = FindOrDie(ts_slots_open_map_, uuid);
+    DCHECK_GE(slots_open, 0);
+    DCHECK_LT(slots_open, opts_.scan_concurrency);
+    slots_open++;
+  }
+}
+
+bool KsckChecksumManager::HasOpenTsSlotsUnlocked() const {
+  for (const auto& entry : ts_slots_open_map_) {
+    DCHECK_GE(entry.second, 0);
+    DCHECK_LE(entry.second, opts_.scan_concurrency);
+    if (entry.second > 0) {
+      return true;
+    }
+  }
+  return false;;
+}
+
+string KsckChecksumManager::OpenTsSlotSummaryString() const {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  string summary = "Summary of Open TS Slots";
+  for (const auto& entry : ts_slots_open_map_) {
+    summary.append(Substitute("\n$0 : $1 / $2",
+                              entry.first,
+                              entry.second,
+                              opts_.scan_concurrency));
+  }
+  return summary;
+}
+
+Status KsckChecksumManager::New(KsckChecksumOptions opts,
+                                TabletInfoMap tablet_infos,
+                                TabletServerList tservers,
+                                shared_ptr<rpc::Messenger> messenger,
+                                shared_ptr<KsckChecksumManager>* manager) {
+  CHECK(manager);
+  int num_replicas;
+  RETURN_NOT_OK(CountReplicasAndCheckTabletServersAreConsistent(
+      tablet_infos,
+      tservers,
+      &num_replicas));
+  auto manager_tmp = KsckChecksumManager::make_shared(num_replicas,
+                                                      std::move(opts),
+                                                      std::move(tablet_infos),
+                                                      std::move(tservers),
+                                                      std::move(messenger));
+  RETURN_NOT_OK(manager_tmp->Init());
+  *manager = std::move(manager_tmp);
+  return Status::OK();
+}
+
+
+KsckChecksumManager::KsckChecksumManager(
+    int num_replicas,
+    KsckChecksumOptions opts,
+    TabletInfoMap tablet_infos,
+    TabletServerList tservers,
+    shared_ptr<rpc::Messenger> messenger)
+    : opts_(std::move(opts)),
+      tablet_infos_(std::move(tablet_infos)),
+      tservers_(std::move(tservers)),
+      expected_replica_count_(num_replicas),
+      responses_(num_replicas),
+      messenger_(std::move(messenger)),
       rows_summed_(0),
-      disk_bytes_summed_(0) {}
+      disk_bytes_summed_(0) {
+  InitializeTsSlotsMap();
+}
+
+Status KsckChecksumManager::Init() {
+  ThreadPoolBuilder builder("find_tablets_to_checksum");
+  builder.set_max_threads(1);
+  return builder.Build(&find_tablets_to_checksum_pool_);
+}
+
+void KsckChecksumManager::Shutdown() {
+  find_tablets_to_checksum_pool_->Shutdown();
+  timestamp_update_timer_->Stop();
+}
 
-void ChecksumResultReporter::ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
-  rows_summed_.IncrementBy(delta_rows);
-  disk_bytes_summed_.IncrementBy(delta_bytes);
+void KsckChecksumManager::ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
+  DCHECK_GE(delta_rows, 0);
+  DCHECK_GE(delta_bytes, 0);
+  rows_summed_ += delta_rows;
+  disk_bytes_summed_ += delta_bytes;
 }
 
-// Write an entry to the result map indicating a response from the remote.
-void ChecksumResultReporter::ReportResult(const string& tablet_id,
-                                          const string& replica_uuid,
-                                          const Status& status,
-                                          uint64_t checksum) {
-  std::lock_guard<simple_spinlock> guard(lock_);
-  unordered_map<string, ResultPair>& replica_results =
-      LookupOrInsert(&checksums_, tablet_id, unordered_map<string, ResultPair>());
-  InsertOrDie(&replica_results, replica_uuid, ResultPair(status, checksum));
+void KsckChecksumManager::ReportResult(const string& tablet_id,
+                                       const string& replica_uuid,
+                                       const Status& status,
+                                       uint64_t checksum) {
+  VLOG(1) << LogPrefix(tablet_id, replica_uuid)
+          << "Checksum finished. Status: " << status.ToString();
+
+  {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    auto& tablet_result = LookupOrEmplace(&checksums_,
+                                          tablet_id,
+                                          TabletChecksumResult());
+    EmplaceOrDie(&tablet_result, replica_uuid, std::make_pair(status, checksum));
+    ReleaseTsSlotsUnlocked({ replica_uuid });
+  }
+
   responses_.CountDown();
+  WARN_NOT_OK(find_tablets_to_checksum_pool_->SubmitFunc(
+      std::bind(&KsckChecksumManager::StartTabletChecksums, this)),
+      "failed to submit task to start additional tablet checksums");
 }
 
-// Blocks until either the number of results plus errors reported equals
-// num_tablet_replicas (from the constructor), or until the timeout expires,
-// whichever comes first. Progress messages are printed to 'out'.
-// Returns false if the timeout expired before all responses came in.
-// Otherwise, returns true.
-// Print progress updates to 'out' if it is non-null.
-bool ChecksumResultReporter::WaitFor(const MonoDelta& timeout, std::ostream* out) const {
+KsckChecksumManager::Outcome KsckChecksumManager::WaitFor(std::ostream* out) {
+  SCOPED_CLEANUP({ Shutdown(); });
+
   MonoTime start = MonoTime::Now();
-  MonoTime deadline = start + timeout;
+  const MonoTime deadline = start + opts_.timeout;
+
+  int64_t rows_summed_prev = 0;
+  MonoTime progress_deadline = start + opts_.idle_timeout;
 
   bool done = false;
   while (!done) {
     MonoTime now = MonoTime::Now();
-    auto rem_ms = (deadline - now).ToMilliseconds();
-    if (rem_ms <= 0) return false;
+    int rem_ms = (deadline - now).ToMilliseconds();
+    if (rem_ms <= 0) {
+      return Outcome::TIMED_OUT;
+    }
+
+    done = responses_.WaitFor(MonoDelta::FromMilliseconds(
+        std::min(rem_ms, FLAGS_max_progress_report_wait_ms)));
+
+    // Checked the rows summed vs the previous value to see if any progress has
+    // been made. Also load the disk bytes summed, so there's less chance the
+    // two are out-of-sync when printed later.
+    int64_t rows_summed = rows_summed_;
+    int64_t disk_bytes_summed = disk_bytes_summed_;
+    if (rows_summed == rows_summed_prev) {
+      if (now > progress_deadline) {
+        return Outcome::IDLE_TIMED_OUT;
+      }
+    } else {
+      progress_deadline = now + opts_.idle_timeout;
+    }
+    rows_summed_prev = rows_summed;
 
-    constexpr int64_t max_wait_ms = 5000;
-    done = responses_.WaitFor(
-        MonoDelta::FromMilliseconds(std::min(rem_ms, max_wait_ms)));
     if (out) {
       string status = done ? "finished in" : "running for";
       int run_time_sec = (MonoTime::Now() - start).ToSeconds();
@@ -146,55 +328,151 @@ bool ChecksumResultReporter::WaitFor(const MonoDelta& timeout, std::ostream* out
                            status,
                            run_time_sec,
                            responses_.count(),
-                           expected_count_,
-                           HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()),
-                           HumanReadableInt::ToString(rows_summed_.Load()))
+                           expected_replica_count_,
+                           HumanReadableNumBytes::ToString(disk_bytes_summed),
+                           HumanReadableInt::ToString(rows_summed))
              << endl;
     }
+    VLOG(1) << OpenTsSlotSummaryString() << endl;
+  }
+  return Outcome::FINISHED;
+}
+
+bool KsckChecksumManager::ReserveSlotsToChecksumUnlocked(
+    const shared_ptr<KsckTablet>& tablet) {
+  DCHECK(lock_.is_locked());
+  vector<int*> slot_counts_to_decrement;
+  for (const auto& replica : tablet->replicas()) {
+    auto* slots_open = FindOrNull(ts_slots_open_map_, replica->ts_uuid());
+    DCHECK(slots_open);
+    DCHECK_GE(*slots_open, 0);
+    DCHECK_LE(*slots_open, opts_.scan_concurrency);
+    if (*slots_open == 0) {
+      return false;
+    }
+    slot_counts_to_decrement.push_back(slots_open);
+  }
+  for (auto* slots_open : slot_counts_to_decrement) {
+    (*slots_open)--;
   }
   return true;
 }
 
-TabletServerChecksumCallbacks::TabletServerChecksumCallbacks(
-    scoped_refptr<ChecksumResultReporter> reporter,
-    shared_ptr<KsckTabletServer> tablet_server,
-    SharedTabletQueue queue,
-    string tablet_id,
-    KsckChecksumOptions options)
-    : reporter_(std::move(reporter)),
-      tablet_server_(std::move(tablet_server)),
-      queue_(std::move(queue)),
-      options_(options),
-      tablet_id_(std::move(tablet_id)) {}
-
-void TabletServerChecksumCallbacks::Progress(int64_t rows_summed, int64_t disk_bytes_summed) {
-  reporter_->ReportProgress(rows_summed, disk_bytes_summed);
+Status KsckChecksumManager::RunChecksumsAsync() {
+  if (!messenger_) {
+    rpc::MessengerBuilder builder("timestamp update");
+    RETURN_NOT_OK(builder.Build(&messenger_));
+  }
+  timestamp_update_timer_ = rpc::PeriodicTimer::Create(
+      messenger_,
+      [&]() {
+        VLOG(1) << "Updating timestamps";
+        for (auto& ts : tservers_) {
+          ts->FetchCurrentTimestampAsync();
+        }
+      },
+      MonoDelta::FromMilliseconds(FLAGS_timestamp_update_period_ms));
+  timestamp_update_timer_->Start();
+  StartTabletChecksums();
+  return Status::OK();
 }
 
-void TabletServerChecksumCallbacks::Finished(const Status& status, uint64_t checksum) {
-  reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
+void KsckChecksumManager::BeginTabletChecksum(const TabletChecksumInfo& tablet_info) {
+  VLOG(1) << LogPrefix(tablet_info.tablet->id()) << "Starting checksum";
+  std::unordered_set<string> replica_uuids;
+  for (const auto& replica : tablet_info.tablet->replicas()) {
+    InsertOrDie(&replica_uuids, replica->ts_uuid());
+  }
+
+  TabletServerList tablet_servers;
+  for (const auto& ts : tservers_) {
+    if (ContainsKey(replica_uuids, ts->uuid())) {
+      tablet_servers.push_back(ts);
+    }
+  }
+
+  MAYBE_INJECT_FIXED_LATENCY(FLAGS_wait_before_setting_snapshot_timestamp_ms);
+
+  // Use the current timestamp of a peer if the user did not specify a timestamp.
+  uint64_t timestamp_for_tablet = opts_.snapshot_timestamp;
+  if (opts_.use_snapshot &&
+      opts_.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
+    for (const auto& ts : tablet_servers) {
+      if (ts->is_healthy()) {
+        timestamp_for_tablet = ts->current_timestamp();
+        break;
+      }
+    }
+    // If we couldn't get a timestamp from any peer because they are unhealthy,
+    // short circuit the checksum for the tablet with an error.
+    if (timestamp_for_tablet == KsckChecksumOptions::kCurrentTimestamp) {
+      for (const auto& ts : tablet_servers) {
+        ReportResult(
+            tablet_info.tablet->id(),
+            ts->uuid(),
+            Status::Aborted("no healthy peer was available to provide a timestamp"),
+            0);
+      }
+      return;
+    }
+  }
+
+  VLOG(1) << LogPrefix(tablet_info.tablet->id()) << "Using timestamp "
+          << timestamp_for_tablet;
+
+  for (const auto& ts: tablet_servers) {
+    // Copy options and set timestamp for each replica checksum.
+    KsckChecksumOptions options = opts_;
+    options.snapshot_timestamp = timestamp_for_tablet;
+    ts->RunTabletChecksumScanAsync(tablet_info.tablet->id(),
+                                   tablet_info.schema,
+                                   options,
+                                   shared_from_this());
+  }
+}
 
-  std::pair<Schema, string> table_tablet;
-  if (queue_->BlockingGet(&table_tablet)) {
-    const Schema& table_schema = table_tablet.first;
-    tablet_id_ = table_tablet.second;
-    tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
-  } else {
-    delete this;
+void KsckChecksumManager::StartTabletChecksums() {
+  vector<TabletChecksumInfo> requests_to_process;
+  {
+    // To find all tablets that we can start checksums on, we check every
+    // one. This means checking 'ts_open_slots_map_' once for every replica,
+    // so it's pretty expensive. But, compared to checksumming multi-gigabyte
+    // replicas, and in particular the benefit of greater parallelism in
+    // checksumming such replicas, it seems like it's worth it.
+    std::lock_guard<simple_spinlock> guard(lock_);
+    // Short-circuit if there's no slots available.
+    if (!HasOpenTsSlotsUnlocked()) {
+      VLOG(1) << "No slots open. Short-circuiting search.";
+      return;
+    }
+    for (const auto& entry : tablet_infos_) {
+      const auto& request = entry.second;
+      if (ReserveSlotsToChecksumUnlocked(request.tablet)) {
+        requests_to_process.push_back(request);
+      }
+    }
+    for (const auto& request : requests_to_process) {
+      tablet_infos_.erase(request.tablet->id());
+    }
+    VLOG(1) << Substitute("Starting checksums on $0 tablet(s)",
+                          requests_to_process.size());
+  }
+  for (const auto& request : requests_to_process) {
+    BeginTabletChecksum(request);
   }
 }
 
 KsckChecksummer::KsckChecksummer(KsckCluster* cluster)
     : cluster_(CHECK_NOTNULL(cluster)) {}
 
-Status KsckChecksummer::BuildTabletTableMap(
+Status KsckChecksummer::BuildTabletInfoMap(
     const KsckChecksumOptions& opts,
-    KsckChecksummer::TabletTableMap* tablet_table_map,
+    TabletInfoMap* tablet_infos,
     int* num_replicas) const {
-  CHECK(tablet_table_map);
+  CHECK(tablet_infos);
   CHECK(num_replicas);
 
-  TabletTableMap tablet_table_map_tmp;
+  TabletInfoMap tablet_infos_tmp;
   int num_tables = 0;
   int num_tablets = 0;
   int num_replicas_tmp = 0;
@@ -206,7 +484,9 @@ Status KsckChecksummer::BuildTabletTableMap(
     for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
       VLOG(1) << "Tablet: " << tablet->id();
       if (!MatchesAnyPattern(opts.tablet_id_filters, tablet->id())) continue;
-      InsertOrDie(&tablet_table_map_tmp, tablet, table);
+      EmplaceOrDie(&tablet_infos_tmp,
+                   tablet->id(),
+                   TabletChecksumInfo(tablet, table->schema()));
       num_replicas_tmp += tablet->replicas().size();
     }
   }
@@ -235,13 +515,13 @@ Status KsckChecksummer::BuildTabletTableMap(
     return Status::NotFound(msg);
   }
 
-  *tablet_table_map = std::move(tablet_table_map_tmp);
+  *tablet_infos = std::move(tablet_infos_tmp);
   *num_replicas = num_replicas_tmp;
   return Status::OK();
 }
 
 Status KsckChecksummer::CollateChecksumResults(
-    const ChecksumResultReporter::TabletResultMap& checksums,
+    const TabletChecksumResultsMap& checksums,
     KsckTableChecksumMap* table_checksum_map,
     int* num_results) const {
   CHECK(table_checksum_map);
@@ -267,7 +547,7 @@ Status KsckChecksummer::CollateChecksumResults(
           replica_checksum.ts_uuid = ts->uuid();
           replica_checksum.ts_address = ts->address();
 
-          const ChecksumResultReporter::ResultPair& result = r.second;
+          const ReplicaChecksumResult& result = r.second;
           const Status& status = result.first;
           replica_checksum.checksum = result.second;
           replica_checksum.status = status;
@@ -276,7 +556,8 @@ Status KsckChecksummer::CollateChecksumResults(
           } else if (!seen_first_replica) {
             seen_first_replica = true;
             first_checksum = replica_checksum.checksum;
-          } else if (replica_checksum.checksum != first_checksum) {
+          } else if (replica_checksum.checksum != first_checksum &&
+                     !tablet_checksum.mismatch) {
             num_mismatches++;
             tablet_checksum.mismatch = true;
           }
@@ -290,11 +571,14 @@ Status KsckChecksummer::CollateChecksumResults(
                      std::move(tablet_checksum));
       }
     }
+    if (table_checksum.empty()) {
+      continue;
+    }
     EmplaceOrDie(table_checksum_map, table->name(), std::move(table_checksum));
   }
 
   if (num_mismatches != 0) {
-    return Status::Corruption(Substitute("$0 checksum mismatches were detected.",
+    return Status::Corruption(Substitute("$0 tablet(s) had checksum mismatches",
                                          num_mismatches));
   }
   if (num_errors != 0) {
@@ -308,98 +592,88 @@ Status KsckChecksummer::ChecksumData(const KsckChecksumOptions& opts,
                                      ostream* out_for_progress_updates) {
   CHECK(checksum_results);
 
-  // Make a copy of 'opts' because we may need to assign a snapshot timestamp
-  // if one was not provided.
-  KsckChecksumOptions options = opts;
-
   // Clear the contents of 'checksum_results' because we always overwrite it
   // with whatever results are obtained (and with nothing if there's no results).
   checksum_results->snapshot_timestamp = boost::none;
   checksum_results->tables.clear();
 
-  TabletTableMap tablet_table_map;
+  TabletInfoMap tablet_infos;
   int num_replicas;
-  RETURN_NOT_OK(BuildTabletTableMap(options, &tablet_table_map, &num_replicas));
-
-  // Map of tablet servers to tablet queue.
-  typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
-
-  TabletServerQueueMap tablet_server_queues;
-  scoped_refptr<ChecksumResultReporter> reporter(
-      new ChecksumResultReporter(num_replicas));
-
-  // Create a queue of checksum callbacks grouped by the tablet server.
-  for (const auto& entry : tablet_table_map) {
-    const shared_ptr<KsckTablet>& tablet = entry.first;
-    const shared_ptr<KsckTable>& table = entry.second;
-    for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) {
-      const shared_ptr<KsckTabletServer>& ts =
-          FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
-
-      const SharedTabletQueue& queue =
-          LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_replicas);
-      CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
-    }
+  RETURN_NOT_OK(BuildTabletInfoMap(opts, &tablet_infos, &num_replicas));
+
+  TabletServerList tablet_servers;
+  for (const auto& entry : cluster_->tablet_servers()) {
+    tablet_servers.push_back(entry.second);
   }
 
   // Set the snapshot timestamp. If the sentinel value 'kCurrentTimestamp' was
   // provided, the snapshot timestamp is set to the current timestamp of some
-  // healthy tablet server.
-  if (options.use_snapshot &&
-      options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
-    for (const auto& ts : tablet_server_queues) {
-      if (ts.first->is_healthy()) {
-        options.snapshot_timestamp = ts.first->current_timestamp();
-        break;
+  // healthy tablet server, and it may be updated for each tablet, as it is
+  // checksummed.
+  if (opts.use_snapshot) {
+    if (opts.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
+      // The timestamps are actually set for each tablet when the tablet is
+      // checksummed, but let's do a sanity check that some tablet server is
+      // available to provide timestamps.
+      bool exists_healthy_ts = false;
+      for (const auto& ts : tablet_servers) {
+        if (ts->is_healthy()) {
+          exists_healthy_ts = true;
+          break;
+        }
       }
-    }
-    if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
-      return Status::ServiceUnavailable(
-          "No tablet servers were available to fetch the current timestamp");
-    }
-    checksum_results->snapshot_timestamp = options.snapshot_timestamp;
-  }
-
-  // Kick off checksum scans in parallel. For each tablet server, we start
-  // 'options.scan_concurrency' scans. Each callback then initiates one
-  // additional scan when it returns if the queue for that TS is not empty.
-  for (const auto& entry : tablet_server_queues) {
-    const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
-    const SharedTabletQueue& queue = entry.second;
-    queue->Shutdown(); // Ensures that BlockingGet() will not block.
-    for (int i = 0; i < options.scan_concurrency; i++) {
-      std::pair<Schema, std::string> table_tablet;
-      if (queue->BlockingGet(&table_tablet)) {
-        const Schema& table_schema = table_tablet.first;
-        const std::string& tablet_id = table_tablet.second;
-        auto* cbs = new TabletServerChecksumCallbacks(
-            reporter, tablet_server, queue, tablet_id, options);
-        // 'cbs' deletes itself when complete.
-        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
+      if (!exists_healthy_ts) {
+        return Status::ServiceUnavailable(
+            "no tablet servers are available");
       }
+    } else {
+      // The results only include the snapshot timestamp when it applies to
+      // every tablet.
+      checksum_results->snapshot_timestamp = opts.snapshot_timestamp;
     }
   }
 
-  bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
+  shared_ptr<KsckChecksumManager> manager;
+  RETURN_NOT_OK(KsckChecksumManager::New(opts,
+                                         tablet_infos,
+                                         tablet_servers,
+                                         cluster_->messenger(),
+                                         &manager));
+  RETURN_NOT_OK(manager->RunChecksumsAsync());
+
+  auto final_status = manager->WaitFor(out_for_progress_updates);
 
   // Even if we timed out, collate the checksum results that we did get.
   KsckTableChecksumMap checksum_table_map;
   int num_results;
-  const Status s = CollateChecksumResults(reporter->checksums(),
+  const Status s = CollateChecksumResults(manager->checksums(),
                                           &checksum_table_map,
                                           &num_results);
   checksum_results->tables = std::move(checksum_table_map);
 
-  if (timed_out) {
-    return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: "
-                                       "Received results for $1 out of $2 expected replicas",
-                                       options.timeout.ToString(), num_results,
-                                       num_replicas));
+  switch (final_status) {
+    case KsckChecksumManager::Outcome::TIMED_OUT:
+      return Status::TimedOut(Substitute("Checksum scan did not complete "
+                                         "within the timeout of $0: Received "
+                                         "results for $1 out of $2 expected "
+                                         "replicas",
+                                         opts.timeout.ToString(),
+                                         num_results,
+                                         num_replicas));
+    case KsckChecksumManager::Outcome::IDLE_TIMED_OUT:
+      return Status::TimedOut(Substitute("Checksum scan did not make progress "
+                                         "within the idle timeout of $0: Received "
+                                         "results for $1 out of $2 expected "
+                                         "replicas",
+                                         opts.idle_timeout.ToString(),
+                                         num_results,
+                                         num_replicas));
+    case KsckChecksumManager::Outcome::FINISHED:
+      CHECK_EQ(num_results, num_replicas)
+        << Substitute("Unexpected error: only got $0 out of $1 replica results",
+                      num_results, num_replicas);
+      return s;
   }
-  CHECK_EQ(num_results, num_replicas)
-      << Substitute("Unexpected error: only got $0 out of $1 replica results",
-                    num_results, num_replicas);
-  return s;
 }
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck_checksum.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.h b/src/kudu/tools/ksck_checksum.h
index 8fbb2f1..a09dba7 100644
--- a/src/kudu/tools/ksck_checksum.h
+++ b/src/kudu/tools/ksck_checksum.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <iosfwd>
 #include <memory>
@@ -26,31 +27,35 @@
 #include <utility>
 #include <vector>
 
+#include "kudu/common/schema.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/ref_counted.h"
 #include "kudu/tools/ksck_results.h"
-#include "kudu/util/atomic.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 
 namespace kudu {
 
-class Schema;
+namespace rpc {
+class PeriodicTimer;
+class Messenger;
+} // namespace rpc
 
 namespace tools {
 
 class KsckCluster;
-class KsckTable;
 class KsckTablet;
 class KsckTabletServer;
 
 // Options for checksum scans.
 struct KsckChecksumOptions {
-  // A checksum with this special timestamp will use a timestamp selected by
-  // one of tablet servers performing the snapshot scan.
+  // A checksum with this snapshot timestamp will choose a timestamp for each
+  // tablet, from one of the tablet servers hosting a replica, at the
+  // time the checksums are started.
   static constexpr uint64_t kCurrentTimestamp = 0;
 
   KsckChecksumOptions();
@@ -59,11 +64,13 @@ struct KsckChecksumOptions {
                       std::vector<std::string> tablet_id_filters);
 
   KsckChecksumOptions(MonoDelta timeout,
+                      MonoDelta idle_timeout,
                       int scan_concurrency,
                       bool use_snapshot,
                       uint64_t snapshot_timestamp);
 
   KsckChecksumOptions(MonoDelta timeout,
+                      MonoDelta idle_timeout,
                       int scan_concurrency,
                       bool use_snapshot,
                       uint64_t snapshot_timestamp,
@@ -73,6 +80,10 @@ struct KsckChecksumOptions {
   // The maximum total time to wait for results to come back from all replicas.
   MonoDelta timeout;
 
+  // The maximum amount of time to wait for progress to be made. Progress
+  // means at least one additional row is checksummed.
+  MonoDelta idle_timeout;
+
   // The maximum number of concurrent checksum scans to run per tablet server.
   int scan_concurrency;
 
@@ -88,102 +99,175 @@ struct KsckChecksumOptions {
   std::vector<std::string> tablet_id_filters;
 };
 
-// Interface for reporting progress on checksumming a single
-// replica.
-class KsckChecksumProgressCallbacks {
- public:
-  virtual ~KsckChecksumProgressCallbacks() {}
+typedef std::pair<Status, uint64_t> ReplicaChecksumResult;
+typedef std::unordered_map<std::string, ReplicaChecksumResult> TabletChecksumResult;
+typedef std::unordered_map<std::string, TabletChecksumResult> TabletChecksumResultsMap;
+
+// A convenience struct containing info needed to checksum a particular tablet.
+struct TabletChecksumInfo {
+  TabletChecksumInfo(std::shared_ptr<KsckTablet> tablet, Schema schema)
+    : tablet(std::move(tablet)),
+      schema(std::move(schema)) {}
 
-  // Report incremental progress from the server side.
-  // 'delta_disk_bytes_summed' only counts data read from DiskRowSets on the
-  // server side and does not count MRS bytes, etc.
-  virtual void Progress(int64_t delta_rows_summed, int64_t delta_disk_bytes_summed) = 0;
+  // The tablet to be checksummed.
+  std::shared_ptr<KsckTablet> tablet;
 
-  // The scan of the current tablet is complete.
-  virtual void Finished(const Status& status, uint64_t checksum) = 0;
+  // The schema of the tablet's table.
+  Schema schema;
 };
 
-// Class to act as a collector of scan results.
-// Provides thread-safe accessors to update and read a hash table of results.
-class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporter> {
- public:
-  typedef std::pair<Status, uint64_t> ResultPair;
-  typedef std::unordered_map<std::string, ResultPair> ReplicaResultMap;
-  typedef std::unordered_map<std::string, ReplicaResultMap> TabletResultMap;
+typedef std::unordered_map<std::string, TabletChecksumInfo> TabletInfoMap;
+
+// Map (tablet server UUID -> number of open slots available for checksum scans).
+typedef std::unordered_map<std::string, int> TabletServerChecksumScanSlotsMap;
 
-  // Initialize reporter with the number of replicas being queried.
-  explicit ChecksumResultReporter(int num_tablet_replicas);
+typedef std::vector<std::shared_ptr<KsckTabletServer>> TabletServerList;
 
+// Class to coordinate a checksum process. Checksums are started on all replicas
+// of a tablet at once while respecting per-tablet-server checksum scan
+// concurrency limits.
+class KsckChecksumManager : public std::enable_shared_from_this<KsckChecksumManager>,
+                            public enable_make_shared<KsckChecksumManager> {
+ public:
+  // Return in 'manager' a new KsckChecksumManager created from the given
+  // parameters. All replicas of tablet in 'tablet_infos' must be on tablet
+  // servers in 'tservers'. Because its ownership is shared with callbacks that
+  // are part of the checksum process, a KsckChecksumManager should always be
+  // wrapped in a shared_ptr.
+  // If 'messenger' is non-null, it will be used by the instance; otherwise, a
+  // new messenger will be constructed.
+  static Status New(KsckChecksumOptions opts,
+                    TabletInfoMap tablet_infos,
+                    TabletServerList tservers,
+                    std::shared_ptr<rpc::Messenger> messenger,
+                    std::shared_ptr<KsckChecksumManager>* manager);
+
+  // Reports an increase in the number of rows and bytes from disk processed
+  // by checksums to this KsckChecksumManager. This information is used in
+  // progress messages.
   void ReportProgress(int64_t delta_rows, int64_t delta_bytes);
 
-  // Write an entry to the result map indicating a response from the remote.
+  // Reports the result of checksumming all the replicas of tablet to this
+  // KsckChecksumManager.
   void ReportResult(const std::string& tablet_id,
                     const std::string& replica_uuid,
                     const Status& status,
                     uint64_t checksum);
 
-  // Blocks until either the number of results plus errors reported equals
-  // num_tablet_replicas (from the constructor), or until the timeout expires,
-  // whichever comes first. Progress messages are printed to 'out'.
-  // Returns false if the timeout expired before all responses came in.
-  // Otherwise, returns true.
-  bool WaitFor(const MonoDelta& timeout, std::ostream* out) const;
-
-  // Returns true iff all replicas have reported in.
-  bool AllReported() const { return responses_.count() == 0; }
-
-  // Get reported results.
-  TabletResultMap checksums() const {
+  // The possible final outcomes of a checksum process.
+  enum class Outcome {
+    // All replicas finished their checksums (either successfully or not).
+    FINISHED,
+    // The checksum process timed out.
+    TIMED_OUT,
+    // The checksum process went too long without making progress.
+    IDLE_TIMED_OUT,
+  };
+
+  // Blocks until the number of replica results and errors reported equals
+  // the number of replicas that need to be processed, until the this instnce's
+  // timeout expires, or until the checksum process makes no progress for longer
+  // than this instance's idle timeout, whichever comes first. Progress messages
+  // are printed to 'out' if it is non-null.
+  Outcome WaitFor(std::ostream* out);
+
+  // Run the checksum process asynchronously.
+  // The caller should wait for results with WaitFor().
+  Status RunChecksumsAsync();
+
+  // Get a snapshot of results reported so far.
+  TabletChecksumResultsMap checksums() const {
     std::lock_guard<simple_spinlock> guard(lock_);
     return checksums_;
   }
 
+ protected:
+  KsckChecksumManager(int num_replicas,
+                      KsckChecksumOptions opts,
+                      TabletInfoMap tablet_infos,
+                      TabletServerList tservers,
+                      std::shared_ptr<rpc::Messenger> messenger);
+
  private:
-  friend class RefCountedThreadSafe<ChecksumResultReporter>;
-  ~ChecksumResultReporter() {}
+  // Perform post-construction initialization that may fail.
+  Status Init();
 
-  const int expected_count_;
-  CountDownLatch responses_;
+  // Shutdown this manager.
+  void Shutdown();
+
+  // Start as many tablet checksums as possible, given the per-tablet-server
+  // concurrency limits on checksum scans.
+  // Since this uses a brute force method, it is fairly expensive, and therefore
+  // we run it on a threadpool instead of in the callback, which is run from a
+  // reactor thread.
+  void StartTabletChecksums();
+
+  // Are there enough checksum scan slots available on the tablet servers
+  // hosting replicas of 'tablet' to start a checksum scan on all of them?
+  // If so, return true and reserve the slots. Else, return false.
+  bool ReserveSlotsToChecksumUnlocked(const std::shared_ptr<KsckTablet>& tablet);
+
+  // Begin the checksum on the tablet named in 'tablet_info'.
+  void BeginTabletChecksum(const TabletChecksumInfo& tablet_info);
+
+  // Initialize 'ts_open_slots_map_'.
+  void InitializeTsSlotsMap();
+
+  // Release a checksum scan slot on each tserver in 'tserver_uuids'.
+  void ReleaseTsSlotsUnlocked(const std::vector<std::string>& ts_uuids);
+
+  // Are there any open slots at all?
+  bool HasOpenTsSlotsUnlocked() const;
+
+  // Returns a summary of checksum scan slot usage across tablet servers.
+  // This is useful as debug info to check how saturated the tablet servers are
+  // with checksum scans.
+  std::string OpenTsSlotSummaryString() const;
+
+  // The options for the checksum process.
+  const KsckChecksumOptions opts_;
+
+  // A map of information about tablets to be checksummed. As tablet checksums
+  // are started, entries are removed from this map.
+  TabletInfoMap tablet_infos_;
+
+  // Tracks the open slots for each tablet server that hosts a replica of the
+  // tablets in 'tablet_infos_'.
+  TabletServerChecksumScanSlotsMap ts_slots_open_map_;
 
-  mutable simple_spinlock lock_; // Protects 'checksums_'.
   // checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
-  TabletResultMap checksums_;
+  TabletChecksumResultsMap checksums_;
 
-  AtomicInt<int64_t> rows_summed_;
-  AtomicInt<int64_t> disk_bytes_summed_;
-};
+  // Protects 'tablet_infos_', 'ts_slots_map_', and 'checksums_'.
+  mutable simple_spinlock lock_;
 
-// Queue of tablet replicas for an individual tablet server.
-typedef std::shared_ptr<BlockingQueue<std::pair<Schema, std::string>>> SharedTabletQueue;
+  // The list of tablet servers that checksum scans will be run on. Every
+  // replica of tablet to be checksummed must be located on one of these
+  // tablet servers.
+  const TabletServerList tservers_;
 
-// A set of callbacks which records the result of a tablet replica's checksum,
-// and then checks if the tablet server has any more tablets to checksum. If so,
-// a new async checksum scan is started.
-class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks {
- public:
-  TabletServerChecksumCallbacks(
-      scoped_refptr<ChecksumResultReporter> reporter,
-      std::shared_ptr<KsckTabletServer> tablet_server,
-      SharedTabletQueue queue,
-      std::string tablet_id,
-      KsckChecksumOptions options);
+  const int expected_replica_count_;
+  CountDownLatch responses_;
 
-  void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override;
+  // Used for the 'timestamp_update_timer_' periodic timer.
+  std::shared_ptr<rpc::Messenger> messenger_;
 
-  void Finished(const Status& status, uint64_t checksum) override;
+  // A timer used to periodically refresh the timestamps of the tablet servers
+  // in 'tablet_servers_', so that snapshot timestamps don't fall behind the
+  // ancient history mark.
+  std::shared_ptr<rpc::PeriodicTimer> timestamp_update_timer_;
 
- private:
-  ~TabletServerChecksumCallbacks() = default;
+  // A threadpool for running tasks that find additional tablets that can
+  // be checksummed based on available slots on tablet servers.
+  gscoped_ptr<ThreadPool> find_tablets_to_checksum_pool_;
 
-  const scoped_refptr<ChecksumResultReporter> reporter_;
-  const std::shared_ptr<KsckTabletServer> tablet_server_;
-  const SharedTabletQueue queue_;
-  const KsckChecksumOptions options_;
+  std::atomic<int64_t> rows_summed_;
+  std::atomic<int64_t> disk_bytes_summed_;
 
-  std::string tablet_id_;
+  DISALLOW_COPY_AND_ASSIGN(KsckChecksumManager);
 };
 
-// A class for performing checksum scans against a Kudu cluster.
+// A class for performing checksums on a Kudu cluster.
 class KsckChecksummer {
  public:
    // 'cluster' must remain valid as long as this instance is alive.
@@ -199,24 +283,21 @@ class KsckChecksummer {
                       std::ostream* out_for_progress_updates);
 
  private:
-  typedef std::unordered_map<std::shared_ptr<KsckTablet>,
-                             std::shared_ptr<KsckTable>> TabletTableMap;
-
-  // Builds a mapping from tablet-to-be-checksummed to its table, which is
-  // used to create checksum callbacks. This mapping is returned in
-  // 'tablet_table_map' and the total number of replicas to be checksummed is
-  // returned in 'num_replicas'.
-  Status BuildTabletTableMap(const KsckChecksumOptions& opts,
-                             TabletTableMap* tablet_table_map,
-                             int* num_replicas) const;
-
-  // Collates the results of checksums into 'table_checksum_map', with the
-  // total number of results returned as 'num_results'.
+  // Builds a map of tablets to-be-checksummed, given the options in 'opts' and
+  // the cluster 'cluster'. The resulting tablets are populated in 'tablet_infos'
+  // and the total number of replicas to be checksummed is set in 'num_replica'.
+  Status BuildTabletInfoMap(const KsckChecksumOptions& opts,
+                            TabletInfoMap* tablet_infos,
+                            int* num_replicas) const;
+
+  // Collates the results of checksums that are reported in 'checksums' into
+  // 'table_checksum_map', with the total number of results returned as
+  // 'num_results'.
   // NOTE: Even if this function returns a bad Status, 'table_checksum_map'
   // and 'num_results' will still be populated using whatever results are
   // available.
   Status CollateChecksumResults(
-      const ChecksumResultReporter::TabletResultMap& checksums,
+      const TabletChecksumResultsMap& checksums,
       KsckTableChecksumMap* table_checksum_map,
       int* num_results) const;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index f41298c..d19aba7 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -60,6 +60,9 @@
 
 DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(safe_time_max_lag_ms);
+DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_int32(timestamp_update_period_ms);
+DECLARE_int32(wait_before_setting_snapshot_timestamp_ms);
 
 DEFINE_int32(experimental_flag_for_ksck_test, 0,
              "A flag marked experimental so it can be used to test ksck's "
@@ -319,7 +322,9 @@ TEST_F(RemoteKsckTest, TestChecksum) {
     ASSERT_OK(ksck_->FetchInfoFromTabletServers());
 
     err_stream_.str("");
-    s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
+    s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(1),
+                                                MonoDelta::FromSeconds(1),
+                                                16, false, 0));
     if (s.ok()) {
       // Check the status message at the end of the checksum.
       // We expect '0B from disk' because we didn't write enough data to trigger a flush
@@ -343,7 +348,9 @@ TEST_F(RemoteKsckTest, TestChecksumTimeout) {
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   ASSERT_OK(ksck_->FetchInfoFromTabletServers());
   // Use an impossibly low timeout value of zero!
-  Status s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false, 0));
+  Status s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromNanoseconds(0),
+                                                     MonoDelta::FromSeconds(5),
+                                                     16, false, 0));
   ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString();
 }
 
@@ -366,7 +373,9 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshot) {
   ASSERT_OK(ksck_->CheckClusterRunning());
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   ASSERT_OK(ksck_->FetchInfoFromTabletServers());
-  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), 16, true, ts)));
+  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10),
+                                                    MonoDelta::FromSeconds(10),
+                                                    16, true, ts)));
   continue_writing.Store(false);
   ASSERT_OK(promise.Get());
   writer_thread->Join();
@@ -392,13 +401,85 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshotCurrentTimestamp) {
   ASSERT_OK(ksck_->CheckClusterRunning());
   ASSERT_OK(ksck_->FetchTableAndTabletInfo());
   ASSERT_OK(ksck_->FetchInfoFromTabletServers());
-  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
+  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10),
+                                                    MonoDelta::FromSeconds(10),
+                                                    16, true,
                                                     KsckChecksumOptions::kCurrentTimestamp)));
   continue_writing.Store(false);
   ASSERT_OK(promise.Get());
   writer_thread->Join();
 }
 
+// Regression test for KUDU-2179: If the checksum process takes long enough that
+// the snapshot timestamp falls behind the ancient history mark, new replica
+// checksums will fail.
+TEST_F(RemoteKsckTest, TestChecksumSnapshotLastingLongerThanAHM) {
+  // This test is really slow because -tablet_history_max_age_sec's lowest
+  // acceptable value is 1.
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  // This test relies on somewhat precise timing: the timestamp update must
+  // happen during the wait to start the checksum, for each tablet. It's likely
+  // this sometimes won't happen in builds that are slower, so we'll just
+  // disable the test for those builds.
+  #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
+    LOG(WARNING) << "test is skipped in TSAN and ASAN builds";
+    return;
+  #endif
+
+  // Write something so we have rows to checksum, and because we need a valid
+  // timestamp from the client to use for a checksum scan.
+  constexpr uint64_t num_writes = 100;
+  LOG(INFO) << "Generating row writes...";
+  ASSERT_OK(GenerateRowWrites(num_writes));
+
+  // Update timestamps frequently.
+  FLAGS_timestamp_update_period_ms = 200;
+  // Keep history for 1 second. This means snapshot scans with a timestamp older
+  // than 1 second will be rejected.
+  FLAGS_tablet_history_max_age_sec = 1;
+  // Wait for the AHM to pass before assigning a timestamp.
+  FLAGS_wait_before_setting_snapshot_timestamp_ms = 1100;
+
+  ASSERT_OK(ksck_->CheckMasterHealth());
+  ASSERT_OK(ksck_->CheckMasterUnusualFlags());
+  ASSERT_OK(ksck_->CheckMasterConsensus());
+  ASSERT_OK(ksck_->CheckClusterRunning());
+  ASSERT_OK(ksck_->FetchTableAndTabletInfo());
+  ASSERT_OK(ksck_->FetchInfoFromTabletServers());
+
+  // Run a checksum scan at the latest timestamp known to the client. This
+  // should fail, since we will wait until after the AHM has passed to start
+  // any scans.
+  constexpr int timeout_sec = 30;
+  constexpr int scan_concurrency = 16;
+  constexpr bool use_snapshot = true;
+  uint64_t ts = client_->GetLatestObservedTimestamp();
+  Status s = ksck_->ChecksumData(KsckChecksumOptions(
+        MonoDelta::FromSeconds(timeout_sec),
+        MonoDelta::FromSeconds(timeout_sec),
+        scan_concurrency,
+        use_snapshot,
+        ts));
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+  ASSERT_OK(ksck_->PrintResults());
+  ASSERT_STR_CONTAINS(err_stream_.str(),
+                      "Invalid argument: Snapshot timestamp is earlier than "
+                      "the ancient history mark.");
+
+  // Now let's try again using the special current timestamp, which will run
+  // checksums using timestamps updated from the servers, and should succeed.
+  ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(
+      MonoDelta::FromSeconds(timeout_sec),
+      MonoDelta::FromSeconds(timeout_sec),
+      scan_concurrency,
+      use_snapshot,
+      KsckChecksumOptions::kCurrentTimestamp)));
+}
+
 TEST_F(RemoteKsckTest, TestLeaderMasterDown) {
   // Make sure ksck's client is created with the current leader master and that
   // all masters are healthy.

http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index c44922b..8dd83af 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tools/ksck_remote.h"
 
+#include <atomic>
 #include <cstdint>
 #include <map>
 #include <ostream>
@@ -218,22 +219,43 @@ Status RemoteKsckTabletServer::FetchInfo(KsckServerHealth* health) {
     }
   }
 
-  {
-    server::ServerClockRequestPB req;
-    server::ServerClockResponsePB resp;
-    RpcController rpc;
-    rpc.set_timeout(GetDefaultTimeout());
-    RETURN_NOT_OK_PREPEND(generic_proxy_->ServerClock(req, &resp, &rpc),
-                          "could not fetch timestamp");
-    CHECK(resp.has_timestamp());
-    timestamp_ = resp.timestamp();
-  }
+  RETURN_NOT_OK(FetchCurrentTimestamp());
 
   state_ = KsckFetchState::FETCHED;
   *health = KsckServerHealth::HEALTHY;
   return Status::OK();
 }
 
+void RemoteKsckTabletServer::ServerClockResponseCallback::Run() {
+  if (rpc.status().ok()) {
+    ts->timestamp_ = resp.timestamp();
+  } else {
+    LOG(WARNING) << "Failed to retrieve timestamp from " << ts->uuid()
+                 << ": " << rpc.status().ToString();
+  }
+  delete this;
+}
+
+void RemoteKsckTabletServer::FetchCurrentTimestampAsync() {
+  // 'cb' deletes itself when complete.
+  auto* cb = new ServerClockResponseCallback(shared_from_this());
+  cb->rpc.set_timeout(GetDefaultTimeout());
+  generic_proxy_->ServerClockAsync(cb->req,
+                                   &cb->resp,
+                                   &cb->rpc,
+                                   boost::bind(&ServerClockResponseCallback::Run, cb));
+}
+
+Status RemoteKsckTabletServer::FetchCurrentTimestamp() {
+  server::ServerClockRequestPB req;
+  server::ServerClockResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(GetDefaultTimeout());
+  RETURN_NOT_OK(generic_proxy_->ServerClock(req, &resp, &rpc));
+  timestamp_ = resp.timestamp();
+  return Status::OK();
+}
+
 Status RemoteKsckTabletServer::FetchConsensusState(KsckServerHealth* health) {
   DCHECK(health);
   *health = KsckServerHealth::UNAVAILABLE;
@@ -295,14 +317,17 @@ class ChecksumCallbackHandler {
 // After the ChecksumStepper reports its results to the reporter, it deletes itself.
 class ChecksumStepper {
  public:
-  ChecksumStepper(string tablet_id, const Schema& schema, string server_uuid,
-                  KsckChecksumOptions options, KsckChecksumProgressCallbacks* callbacks,
+  ChecksumStepper(string tablet_id,
+                  Schema schema,
+                  string server_uuid,
+                  KsckChecksumOptions options,
+                  shared_ptr<KsckChecksumManager> manager,
                   shared_ptr<tserver::TabletServerServiceProxy> proxy)
-      : schema_(schema),
+      : schema_(std::move(schema)),
         tablet_id_(std::move(tablet_id)),
         server_uuid_(std::move(server_uuid)),
-        options_(options),
-        callbacks_(callbacks),
+        options_(std::move(options)),
+        manager_(std::move(manager)),
         proxy_(std::move(proxy)),
         call_seq_id_(0),
         checksum_(0) {
@@ -313,7 +338,7 @@ class ChecksumStepper {
     Status s = SchemaToColumnPBs(schema_, &cols_,
                                  SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES);
     if (!s.ok()) {
-      callbacks_->Finished(s, 0);
+      manager_->ReportResult(tablet_id_, server_uuid_, s, 0);
     } else {
       SendRequest(kNewRequest);
     }
@@ -326,20 +351,20 @@ class ChecksumStepper {
       s = StatusFromPB(resp_.error().status());
     }
     if (!s.ok()) {
-      callbacks_->Finished(s, 0);
+      manager_->ReportResult(tablet_id_, server_uuid_, s, 0);
       return; // Deletes 'this'.
     }
     if (resp_.has_resource_metrics() || resp_.has_rows_checksummed()) {
       int64_t bytes = resp_.resource_metrics().cfile_cache_miss_bytes() +
           resp_.resource_metrics().cfile_cache_hit_bytes();
-      callbacks_->Progress(resp_.rows_checksummed(), bytes);
+      manager_->ReportProgress(resp_.rows_checksummed(), bytes);
     }
     DCHECK(resp_.has_checksum());
     checksum_ = resp_.checksum();
 
     // Report back with results.
     if (!resp_.has_more_results()) {
-      callbacks_->Finished(s, checksum_);
+      manager_->ReportResult(tablet_id_, server_uuid_, s, checksum_);
       return; // Deletes 'this'.
     }
 
@@ -398,7 +423,7 @@ class ChecksumStepper {
   const string tablet_id_;
   const string server_uuid_;
   const KsckChecksumOptions options_;
-  KsckChecksumProgressCallbacks* const callbacks_;
+  shared_ptr<KsckChecksumManager> const manager_;
   const shared_ptr<tserver::TabletServerServiceProxy> proxy_;
 
   uint32_t call_seq_id_;
@@ -418,9 +443,9 @@ void RemoteKsckTabletServer::RunTabletChecksumScanAsync(
         const string& tablet_id,
         const Schema& schema,
         const KsckChecksumOptions& options,
-        KsckChecksumProgressCallbacks* callbacks) {
+        shared_ptr<KsckChecksumManager> manager) {
   gscoped_ptr<ChecksumStepper> stepper(
-      new ChecksumStepper(tablet_id, schema, uuid(), options, callbacks, ts_proxy_));
+      new ChecksumStepper(tablet_id, schema, uuid(), options, manager, ts_proxy_));
   stepper->Start();
   ignore_result(stepper.release()); // Deletes self on callback.
 }
@@ -455,10 +480,10 @@ Status RemoteKsckCluster::RetrieveTabletServers() {
 
   TSMap tablet_servers;
   for (const auto* s : servers) {
-    shared_ptr<RemoteKsckTabletServer> ts(
-        new RemoteKsckTabletServer(s->uuid(),
-                                   HostPort(s->hostname(), s->port()),
-                                   messenger_));
+    auto ts = std::make_shared<RemoteKsckTabletServer>(
+        s->uuid(),
+        HostPort(s->hostname(), s->port()),
+        messenger_);
     RETURN_NOT_OK(ts->Init());
     InsertOrDie(&tablet_servers, ts->uuid(), ts);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0d4740b7/src/kudu/tools/ksck_remote.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 54d0f1d..2dac723 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -24,6 +24,8 @@
 #include <vector>
 
 #include "kudu/client/shared_ptr.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/server/server_base.pb.h"
 #include "kudu/tools/ksck.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
@@ -54,7 +56,8 @@ class TabletServerServiceProxy;
 
 namespace tools {
 
-class KsckChecksumProgressCallbacks;
+class KsckChecksumManager;
+
 enum class KsckServerHealth;
 struct KsckChecksumOptions;
 
@@ -85,13 +88,14 @@ class RemoteKsckMaster : public KsckMaster {
 };
 
 // This implementation connects to a tablet server via RPC.
-class RemoteKsckTabletServer : public KsckTabletServer {
+class RemoteKsckTabletServer : public KsckTabletServer,
+                               public std::enable_shared_from_this<RemoteKsckTabletServer> {
  public:
   explicit RemoteKsckTabletServer(const std::string& id,
-                                  const HostPort host_port,
+                                  HostPort host_port,
                                   std::shared_ptr<rpc::Messenger> messenger)
       : KsckTabletServer(id),
-        host_port_(host_port),
+        host_port_(std::move(host_port)),
         messenger_(std::move(messenger)) {
   }
 
@@ -105,17 +109,38 @@ class RemoteKsckTabletServer : public KsckTabletServer {
 
   Status FetchUnusualFlags() override;
 
+  void FetchCurrentTimestampAsync() override;
+  Status FetchCurrentTimestamp() override;
+
   void RunTabletChecksumScanAsync(
       const std::string& tablet_id,
       const Schema& schema,
       const KsckChecksumOptions& options,
-      KsckChecksumProgressCallbacks* callbacks) override;
+      std::shared_ptr<KsckChecksumManager> manager) override;
 
   virtual std::string address() const override {
     return host_port_.ToString();
   }
 
  private:
+  // A callback to update the timestamp from the remote server.
+  struct ServerClockResponseCallback {
+   public:
+    explicit ServerClockResponseCallback(std::shared_ptr<RemoteKsckTabletServer> ts)
+        : ts(std::move(ts)) {}
+
+    void Run();
+
+    std::shared_ptr<RemoteKsckTabletServer> ts;
+    server::ServerClockRequestPB req;
+    server::ServerClockResponsePB resp;
+    rpc::RpcController rpc;
+
+   private:
+    // Prevent instances of this class from being allocated on the stack.
+    ~ServerClockResponseCallback() = default;
+  };
+
   const HostPort host_port_;
   const std::shared_ptr<rpc::Messenger> messenger_;
   std::shared_ptr<server::GenericServiceProxy> generic_proxy_;
@@ -137,6 +162,10 @@ class RemoteKsckCluster : public KsckCluster {
 
   virtual Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table) override;
 
+  std::shared_ptr<rpc::Messenger> messenger() const override {
+    return messenger_;
+  }
+
  private:
   RemoteKsckCluster(std::vector<std::string> master_addresses,
                     std::shared_ptr<rpc::Messenger> messenger)


[2/2] kudu git commit: KUDU-2598 skip test_scanner_to_pandas_index appropriately

Posted by aw...@apache.org.
KUDU-2598 skip test_scanner_to_pandas_index appropriately

We should skip the test if either pandas is not available or if decimal
is not available.

Change-Id: Ie9e3f0c2f6884bd5388c9b918703e5b4b2e543d7
Reviewed-on: http://gerrit.cloudera.org:8080/11620
Reviewed-by: Hao Hao <ha...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4de4347a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4de4347a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4de4347a

Branch: refs/heads/master
Commit: 4de4347a3fd28a8bad3d481fdfa90acecd584520
Parents: 0d4740b
Author: Andrew Wong <aw...@cloudera.com>
Authored: Mon Oct 8 16:30:38 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Tue Oct 9 00:35:30 2018 +0000

----------------------------------------------------------------------
 python/kudu/tests/test_scanner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4de4347a/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
index 3c7fe57..4bedf96 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -382,7 +382,7 @@ class TestScanner(TestScanBase):
         self.assertEqual(df.index.name, 'key')
         self.assertEqual(list(df.index), [1, 2])
 
-    @pytest.mark.skipif((not(kudu.CLIENT_SUPPORTS_PANDAS) and
+    @pytest.mark.skipif((not(kudu.CLIENT_SUPPORTS_PANDAS) or
                         (not(kudu.CLIENT_SUPPORTS_DECIMAL))),
                         reason="Pandas and Decimal support required to run this test.")
     def test_scanner_to_pandas_index(self):