You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/10/02 18:52:26 UTC
[2/3] kudu git commit: [tools] ksck checksums: Factor out of main
ksck code
[tools] ksck checksums: Factor out of main ksck code
To prepare to address KUDU-2179 and to improve the organization of the
code, this patch factors checksum-related classes and code out of
ksck.{cc,h} into a new pair of files ksck_checksum.{cc,h}. Following
the pattern of other ksck-related classes, it also renames some classes
so they start with "Ksck".
There are no functional changes in this patch.
Change-Id: I4bb1f51af22ab0c6c20b9426dbb62ea48413ed5b
Reviewed-on: http://gerrit.cloudera.org:8080/11488
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f5f57f5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f5f57f5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f5f57f5
Branch: refs/heads/master
Commit: 7f5f57f5d73c3b583478210d9b704d7b3793ffe6
Parents: 9f9070a
Author: Will Berkeley <wd...@gmail.org>
Authored: Fri Sep 7 16:05:55 2018 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Tue Oct 2 14:38:54 2018 +0000
----------------------------------------------------------------------
src/kudu/integration-tests/cluster_verifier.cc | 2 +-
src/kudu/integration-tests/cluster_verifier.h | 8 +-
src/kudu/tools/CMakeLists.txt | 1 +
src/kudu/tools/ksck-test.cc | 5 +-
src/kudu/tools/ksck.cc | 182 ++------------------
src/kudu/tools/ksck.h | 54 +-----
src/kudu/tools/ksck_checksum.cc | 157 +++++++++++++++++
src/kudu/tools/ksck_checksum.h | 163 ++++++++++++++++++
src/kudu/tools/ksck_remote-test.cc | 11 +-
src/kudu/tools/ksck_remote.cc | 11 +-
src/kudu/tools/ksck_remote.h | 6 +-
11 files changed, 363 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index 8f54554..6a19581 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -31,6 +31,7 @@
#include "kudu/integration-tests/log_verifier.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_checksum.h"
#include "kudu/tools/ksck_remote.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -51,7 +52,6 @@ using tools::RemoteKsckCluster;
ClusterVerifier::ClusterVerifier(MiniCluster* cluster)
: cluster_(cluster),
- checksum_options_(tools::ChecksumOptions()),
operations_timeout_(MonoDelta::FromSeconds(60)) {
checksum_options_.use_snapshot = false;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/integration-tests/cluster_verifier.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.h b/src/kudu/integration-tests/cluster_verifier.h
index 533a707..f95e742 100644
--- a/src/kudu/integration-tests/cluster_verifier.h
+++ b/src/kudu/integration-tests/cluster_verifier.h
@@ -19,9 +19,9 @@
#include <string>
#include "kudu/gutil/macros.h"
-#include "kudu/tools/ksck.h"
-#include "kudu/util/status.h"
+#include "kudu/tools/ksck_checksum.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
namespace kudu {
@@ -87,9 +87,9 @@ class ClusterVerifier {
int expected_row_count);
- cluster::MiniCluster* cluster_;
+ tools::KsckChecksumOptions checksum_options_;
- tools::ChecksumOptions checksum_options_;
+ cluster::MiniCluster* cluster_;
MonoDelta operations_timeout_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 97310f9..c9c7db3 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -66,6 +66,7 @@ target_link_libraries(kudu_tools_util
add_library(ksck
ksck.cc
+ ksck_checksum.cc
ksck_remote.cc
ksck_results.cc
)
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 273be4b..a07554e 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -43,6 +43,7 @@
#include "kudu/server/server_base.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tools/ksck_checksum.h"
#include "kudu/tools/ksck_results.h"
#include "kudu/util/jsonreader.h"
#include "kudu/util/scoped_cleanup.h"
@@ -144,8 +145,8 @@ class MockKsckTabletServer : public KsckTabletServer {
virtual void RunTabletChecksumScanAsync(
const std::string& /*tablet_id*/,
const Schema& /*schema*/,
- const ChecksumOptions& /*options*/,
- ChecksumProgressCallbacks* callbacks) OVERRIDE {
+ const KsckChecksumOptions& /*options*/,
+ KsckChecksumProgressCallbacks* callbacks) OVERRIDE {
callbacks->Progress(10, 20);
callbacks->Finished(Status::OK(), 0);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index c426d38..a5ed001 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -36,15 +36,14 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/color.h"
+#include "kudu/tools/ksck_checksum.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/util/atomic.h"
#include "kudu/util/blocking_queue.h"
-#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/threadpool.h"
@@ -58,17 +57,6 @@
DEFINE_bool(checksum_scan, false,
"Perform a checksum scan on data in the cluster.");
-DEFINE_int32(checksum_timeout_sec, 3600,
- "Maximum total seconds to wait for a checksum scan to complete "
- "before timing out.");
-DEFINE_int32(checksum_scan_concurrency, 4,
- "Number of concurrent checksum scans to execute per tablet server.");
-DEFINE_bool(checksum_snapshot, true, "Should the checksum scanner use a snapshot scan");
-DEFINE_uint64(checksum_snapshot_timestamp,
- kudu::tools::ChecksumOptions::kCurrentTimestamp,
- "timestamp to use for snapshot checksum scans, defaults to 0, which "
- "uses the current timestamp of a tablet server involved in the scan");
-
DEFINE_int32(fetch_replica_info_concurrency, 20,
"Number of concurrent tablet servers to fetch replica info from.");
@@ -140,22 +128,6 @@ bool IsNonJSONFormat() {
} // anonymous namespace
-ChecksumOptions::ChecksumOptions()
- : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
- scan_concurrency(FLAGS_checksum_scan_concurrency),
- use_snapshot(FLAGS_checksum_snapshot),
- snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {
-}
-
-ChecksumOptions::ChecksumOptions(MonoDelta timeout, int scan_concurrency,
- bool use_snapshot, uint64_t snapshot_timestamp)
- : timeout(timeout),
- scan_concurrency(scan_concurrency),
- use_snapshot(use_snapshot),
- snapshot_timestamp(snapshot_timestamp) {}
-
-const uint64_t ChecksumOptions::kCurrentTimestamp = 0;
-
tablet::TabletStatePB KsckTabletServer::ReplicaState(const std::string& tablet_id) const {
CHECK_EQ(state_, KsckFetchState::FETCHED);
if (!ContainsKey(tablet_status_map_, tablet_id)) {
@@ -449,7 +421,7 @@ Status Ksck::Run() {
"table consistency check error");
if (FLAGS_checksum_scan) {
- PUSH_PREPEND_NOT_OK(ChecksumData(ChecksumOptions()),
+ PUSH_PREPEND_NOT_OK(ChecksumData(KsckChecksumOptions()),
results_.error_messages, "checksum scan error");
}
@@ -558,146 +530,9 @@ Status Ksck::CheckTablesConsistency() {
return Status::OK();
}
-// Class to act as a collector of scan results.
-// Provides thread-safe accessors to update and read a hash table of results.
-class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporter> {
- public:
- typedef std::pair<Status, uint64_t> ResultPair;
- typedef std::unordered_map<std::string, ResultPair> ReplicaResultMap;
- typedef std::unordered_map<std::string, ReplicaResultMap> TabletResultMap;
-
- // Initialize reporter with the number of replicas being queried.
- explicit ChecksumResultReporter(int num_tablet_replicas)
- : expected_count_(num_tablet_replicas),
- responses_(num_tablet_replicas),
- rows_summed_(0),
- disk_bytes_summed_(0) {
- }
-
- void ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
- rows_summed_.IncrementBy(delta_rows);
- disk_bytes_summed_.IncrementBy(delta_bytes);
- }
-
- // Write an entry to the result map indicating a response from the remote.
- void ReportResult(const std::string& tablet_id,
- const std::string& replica_uuid,
- const Status& status,
- uint64_t checksum) {
- std::lock_guard<simple_spinlock> guard(lock_);
- unordered_map<string, ResultPair>& replica_results =
- LookupOrInsert(&checksums_, tablet_id, unordered_map<string, ResultPair>());
- InsertOrDie(&replica_results, replica_uuid, ResultPair(status, checksum));
- responses_.CountDown();
- }
-
- // Blocks until either the number of results plus errors reported equals
- // num_tablet_replicas (from the constructor), or until the timeout expires,
- // whichever comes first. Progress messages are printed to 'out'.
- // Returns false if the timeout expired before all responses came in.
- // Otherwise, returns true.
- bool WaitFor(const MonoDelta& timeout, std::ostream* out) const {
- MonoTime start = MonoTime::Now();
- MonoTime deadline = start + timeout;
-
- bool done = false;
- while (!done) {
- MonoTime now = MonoTime::Now();
- int rem_ms = (deadline - now).ToMilliseconds();
- if (rem_ms <= 0) return false;
-
- done = responses_.WaitFor(MonoDelta::FromMilliseconds(std::min(rem_ms, 5000)));
- string status = done ? "finished in " : "running for ";
- if (IsNonJSONFormat()) {
- int run_time_sec = (MonoTime::Now() - start).ToSeconds();
- (*out) << "Checksum " << status << run_time_sec << "s: "
- << responses_.count() << "/" << expected_count_ << " replicas remaining ("
- << HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()) << " from disk, "
- << HumanReadableInt::ToString(rows_summed_.Load()) << " rows summed)"
- << endl;
- }
- }
- return true;
- }
-
- // Returns true iff all replicas have reported in.
- bool AllReported() const { return responses_.count() == 0; }
-
- // Get reported results.
- TabletResultMap checksums() const {
- std::lock_guard<simple_spinlock> guard(lock_);
- return checksums_;
- }
-
- private:
- friend class RefCountedThreadSafe<ChecksumResultReporter>;
- ~ChecksumResultReporter() {}
-
- // Report either a success or error response.
- void HandleResponse(const std::string& tablet_id, const std::string& replica_uuid,
- const Status& status, uint64_t checksum);
-
- const int expected_count_;
- CountDownLatch responses_;
-
- mutable simple_spinlock lock_; // Protects 'checksums_'.
- // checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
- TabletResultMap checksums_;
-
- AtomicInt<int64_t> rows_summed_;
- AtomicInt<int64_t> disk_bytes_summed_;
-};
-
-// Queue of tablet replicas for an individual tablet server.
-typedef shared_ptr<BlockingQueue<std::pair<Schema, std::string>>> SharedTabletQueue;
-
-// A set of callbacks which records the result of a tablet replica's checksum,
-// and then checks if the tablet server has any more tablets to checksum. If so,
-// a new async checksum scan is started.
-class TabletServerChecksumCallbacks : public ChecksumProgressCallbacks {
- public:
- TabletServerChecksumCallbacks(
- scoped_refptr<ChecksumResultReporter> reporter,
- shared_ptr<KsckTabletServer> tablet_server,
- SharedTabletQueue queue,
- std::string tablet_id,
- ChecksumOptions options) :
- reporter_(std::move(reporter)),
- tablet_server_(std::move(tablet_server)),
- queue_(std::move(queue)),
- options_(options),
- tablet_id_(std::move(tablet_id)) {
- }
-
- void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override {
- reporter_->ReportProgress(rows_summed, disk_bytes_summed);
- }
-
- void Finished(const Status& status, uint64_t checksum) override {
- reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
-
- std::pair<Schema, std::string> table_tablet;
- if (queue_->BlockingGet(&table_tablet)) {
- const Schema& table_schema = table_tablet.first;
- tablet_id_ = table_tablet.second;
- tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
- } else {
- delete this;
- }
- }
-
- private:
- const scoped_refptr<ChecksumResultReporter> reporter_;
- const shared_ptr<KsckTabletServer> tablet_server_;
- const SharedTabletQueue queue_;
- const ChecksumOptions options_;
-
- std::string tablet_id_;
-};
-
-Status Ksck::ChecksumData(const ChecksumOptions& opts) {
+Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
// Copy options so that local modifications can be made and passed on.
- ChecksumOptions options = opts;
+ KsckChecksumOptions options = opts;
typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>> TabletTableMap;
TabletTableMap tablet_table_map;
@@ -762,7 +597,8 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
}
}
- if (options.use_snapshot && options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
+ if (options.use_snapshot &&
+ options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
// Set the snapshot timestamp to the current timestamp of the first healthy tablet server
// we can find.
for (const auto& ts : tablet_server_queues) {
@@ -771,7 +607,7 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
break;
}
}
- if (options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
+ if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
return Status::ServiceUnavailable(
"No tablet servers were available to fetch the current timestamp");
}
@@ -798,7 +634,9 @@ Status Ksck::ChecksumData(const ChecksumOptions& opts) {
}
}
- bool timed_out = !reporter->WaitFor(options.timeout, out_);
+ // Don't ruin JSON output by printing progress updates.
+ auto* out_for_progress_updates = IsNonJSONFormat() ? out_ : nullptr;
+ bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
// Even if we timed out, for printing collate the checksum results that we did get.
ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 3857b4d..8e5e0b2 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -41,40 +41,17 @@
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h" // IWYU pragma: keep
#include "kudu/tools/ksck_results.h"
-#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
-namespace tools {
-
-class KsckTable;
-
-// Options for checksum scans.
-struct ChecksumOptions {
- public:
-
- ChecksumOptions();
-
- ChecksumOptions(MonoDelta timeout,
- int scan_concurrency,
- bool use_snapshot,
- uint64_t snapshot_timestamp);
-
- // The maximum total time to wait for results to come back from all replicas.
- MonoDelta timeout;
-
- // The maximum number of concurrent checksum scans to run per tablet server.
- int scan_concurrency;
- // Whether to use a snapshot checksum scanner.
- bool use_snapshot;
+class MonoDelta;
- // The snapshot timestamp to use for snapshot checksum scans.
- uint64_t snapshot_timestamp;
+namespace tools {
- // A timestamp indicating that the current time should be used for a checksum snapshot.
- static const uint64_t kCurrentTimestamp;
-};
+class KsckChecksumProgressCallbacks;
+class KsckTable;
+struct KsckChecksumOptions;
// Representation of a tablet replica on a tablet server.
class KsckTabletReplica {
@@ -178,21 +155,6 @@ class KsckTable {
DISALLOW_COPY_AND_ASSIGN(KsckTable);
};
-// Interface for reporting progress on checksumming a single
-// replica.
-class ChecksumProgressCallbacks {
- public:
- virtual ~ChecksumProgressCallbacks() {}
-
- // Report incremental progress from the server side.
- // 'disk_bytes_summed' only counts data read from DiskRowSets on the server side
- // and does not count MRS bytes, etc.
- virtual void Progress(int64_t delta_rows_summed, int64_t delta_disk_bytes_summed) = 0;
-
- // The scan of the current tablet is complete.
- virtual void Finished(const Status& status, uint64_t checksum) = 0;
-};
-
// Enum representing the fetch status of a ksck master or tablet server.
enum class KsckFetchState {
// Information has not yet been fetched.
@@ -336,8 +298,8 @@ class KsckTabletServer {
virtual void RunTabletChecksumScanAsync(
const std::string& tablet_id,
const Schema& schema,
- const ChecksumOptions& options,
- ChecksumProgressCallbacks* callbacks) = 0;
+ const KsckChecksumOptions& options,
+ KsckChecksumProgressCallbacks* callbacks) = 0;
virtual const std::string& uuid() const {
return uuid_;
@@ -550,7 +512,7 @@ class Ksck {
// Verifies data checksums on all tablets by doing a scan of the database on each replica.
// Must first call FetchTableAndTabletInfo().
- Status ChecksumData(const ChecksumOptions& options);
+ Status ChecksumData(const KsckChecksumOptions& opts);
// Runs all the checks of ksck in the proper order, including checksum scans,
// if enabled. Returns OK if and only if all checks succeed.
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_checksum.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
new file mode 100644
index 0000000..9d2b289
--- /dev/null
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/ksck_checksum.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include <gflags/gflags.h>
+
+#include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/ksck.h"
+
+using std::endl;
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using strings::Substitute;
+
+DEFINE_int32(checksum_timeout_sec, 3600,
+ "Maximum total seconds to wait for a checksum scan to complete "
+ "before timing out.");
+DEFINE_int32(checksum_scan_concurrency, 4,
+ "Number of concurrent checksum scans to execute per tablet server.");
+DEFINE_bool(checksum_snapshot, true, "Should the checksum scanner use a snapshot scan?");
+DEFINE_uint64(checksum_snapshot_timestamp,
+ kudu::tools::KsckChecksumOptions::kCurrentTimestamp,
+ "Timestamp to use for snapshot checksum scans. Defaults to 0, which "
+ "uses the current timestamp of a tablet server involved in the scan.");
+
+namespace kudu {
+namespace tools {
+
+KsckChecksumOptions::KsckChecksumOptions()
+ : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
+ scan_concurrency(FLAGS_checksum_scan_concurrency),
+ use_snapshot(FLAGS_checksum_snapshot),
+ snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {}
+
+KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, int scan_concurrency,
+ bool use_snapshot, uint64_t snapshot_timestamp)
+ : timeout(timeout),
+ scan_concurrency(scan_concurrency),
+ use_snapshot(use_snapshot),
+ snapshot_timestamp(snapshot_timestamp) {}
+
+ChecksumResultReporter::ChecksumResultReporter(int num_tablet_replicas)
+ : expected_count_(num_tablet_replicas),
+ responses_(num_tablet_replicas),
+ rows_summed_(0),
+ disk_bytes_summed_(0) {}
+
+void ChecksumResultReporter::ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
+ rows_summed_.IncrementBy(delta_rows);
+ disk_bytes_summed_.IncrementBy(delta_bytes);
+}
+
+// Write an entry to the result map indicating a response from the remote.
+void ChecksumResultReporter::ReportResult(const string& tablet_id,
+ const string& replica_uuid,
+ const Status& status,
+ uint64_t checksum) {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ unordered_map<string, ResultPair>& replica_results =
+ LookupOrInsert(&checksums_, tablet_id, unordered_map<string, ResultPair>());
+ InsertOrDie(&replica_results, replica_uuid, ResultPair(status, checksum));
+ responses_.CountDown();
+}
+
+// Blocks until either the number of results plus errors reported equals
+// num_tablet_replicas (from the constructor), or until the timeout expires,
+// whichever comes first. Progress messages are printed to 'out'.
+// Returns false if the timeout expired before all responses came in.
+// Otherwise, returns true.
+// Print progress updates to 'out' if it is non-null.
+bool ChecksumResultReporter::WaitFor(const MonoDelta& timeout, std::ostream* out) const {
+ MonoTime start = MonoTime::Now();
+ MonoTime deadline = start + timeout;
+
+ bool done = false;
+ while (!done) {
+ MonoTime now = MonoTime::Now();
+ auto rem_ms = (deadline - now).ToMilliseconds();
+ if (rem_ms <= 0) return false;
+
+ constexpr int64_t max_wait_ms = 5000;
+ done = responses_.WaitFor(
+ MonoDelta::FromMilliseconds(std::min(rem_ms, max_wait_ms)));
+ if (out) {
+ string status = done ? "finished in" : "running for";
+ int run_time_sec = (MonoTime::Now() - start).ToSeconds();
+ (*out) << Substitute("Checksum $0 $1s: $2/$3 replicas remaining "
+ "($4 from disk, $5 rows summed)",
+ status,
+ run_time_sec,
+ responses_.count(),
+ expected_count_,
+ HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()),
+ HumanReadableInt::ToString(rows_summed_.Load()))
+ << endl;
+ }
+ }
+ return true;
+}
+
+TabletServerChecksumCallbacks::TabletServerChecksumCallbacks(
+ scoped_refptr<ChecksumResultReporter> reporter,
+ shared_ptr<KsckTabletServer> tablet_server,
+ SharedTabletQueue queue,
+ string tablet_id,
+ KsckChecksumOptions options)
+ : reporter_(std::move(reporter)),
+ tablet_server_(std::move(tablet_server)),
+ queue_(std::move(queue)),
+ options_(options),
+ tablet_id_(std::move(tablet_id)) {}
+
+void TabletServerChecksumCallbacks::Progress(int64_t rows_summed, int64_t disk_bytes_summed) {
+ reporter_->ReportProgress(rows_summed, disk_bytes_summed);
+}
+
+void TabletServerChecksumCallbacks::Finished(const Status& status, uint64_t checksum) {
+ reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
+
+ std::pair<Schema, string> table_tablet;
+ if (queue_->BlockingGet(&table_tablet)) {
+ const Schema& table_schema = table_tablet.first;
+ tablet_id_ = table_tablet.second;
+ tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
+ } else {
+ delete this;
+ }
+}
+
+} // namespace tools
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_checksum.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.h b/src/kudu/tools/ksck_checksum.h
new file mode 100644
index 0000000..7285f92
--- /dev/null
+++ b/src/kudu/tools/ksck_checksum.h
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <iosfwd>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/blocking_queue.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Schema;
+
+namespace tools {
+
+class KsckTabletServer;
+
+// Options for checksum scans.
+struct KsckChecksumOptions {
+ // A checksum with this special timestamp will use a timestamp selected by
+ // one of tablet servers performing the snapshot scan.
+ static constexpr uint64_t kCurrentTimestamp = 0;
+
+ KsckChecksumOptions();
+
+ KsckChecksumOptions(MonoDelta timeout,
+ int scan_concurrency,
+ bool use_snapshot,
+ uint64_t snapshot_timestamp);
+
+ // The maximum total time to wait for results to come back from all replicas.
+ MonoDelta timeout;
+
+ // The maximum number of concurrent checksum scans to run per tablet server.
+ int scan_concurrency;
+
+ // Whether to use a snapshot checksum scanner.
+ bool use_snapshot;
+
+ // The snapshot timestamp to use for snapshot checksum scans.
+ uint64_t snapshot_timestamp;
+};
+
+// Interface for reporting progress on checksumming a single
+// replica.
+class KsckChecksumProgressCallbacks {
+ public:
+ virtual ~KsckChecksumProgressCallbacks() {}
+
+ // Report incremental progress from the server side.
+ // 'delta_disk_bytes_summed' only counts data read from DiskRowSets on the
+ // server side and does not count MRS bytes, etc.
+ virtual void Progress(int64_t delta_rows_summed, int64_t delta_disk_bytes_summed) = 0;
+
+ // The scan of the current tablet is complete.
+ virtual void Finished(const Status& status, uint64_t checksum) = 0;
+};
+
+// Class to act as a collector of scan results.
+// Provides thread-safe accessors to update and read a hash table of results.
+class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporter> {
+ public:
+ typedef std::pair<Status, uint64_t> ResultPair;
+ typedef std::unordered_map<std::string, ResultPair> ReplicaResultMap;
+ typedef std::unordered_map<std::string, ReplicaResultMap> TabletResultMap;
+
+ // Initialize reporter with the number of replicas being queried.
+ explicit ChecksumResultReporter(int num_tablet_replicas);
+
+ void ReportProgress(int64_t delta_rows, int64_t delta_bytes);
+
+ // Write an entry to the result map indicating a response from the remote.
+ void ReportResult(const std::string& tablet_id,
+ const std::string& replica_uuid,
+ const Status& status,
+ uint64_t checksum);
+
+ // Blocks until either the number of results plus errors reported equals
+ // num_tablet_replicas (from the constructor), or until the timeout expires,
+ // whichever comes first. Progress messages are printed to 'out'.
+ // Returns false if the timeout expired before all responses came in.
+ // Otherwise, returns true.
+ bool WaitFor(const MonoDelta& timeout, std::ostream* out) const;
+
+ // Returns true iff all replicas have reported in.
+ bool AllReported() const { return responses_.count() == 0; }
+
+ // Get reported results.
+ TabletResultMap checksums() const {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ return checksums_;
+ }
+
+ private:
+ friend class RefCountedThreadSafe<ChecksumResultReporter>;
+ ~ChecksumResultReporter() {}
+
+ const int expected_count_;
+ CountDownLatch responses_;
+
+ mutable simple_spinlock lock_; // Protects 'checksums_'.
+ // checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
+ TabletResultMap checksums_;
+
+ AtomicInt<int64_t> rows_summed_;
+ AtomicInt<int64_t> disk_bytes_summed_;
+};
+
+// Queue of tablet replicas for an individual tablet server.
+typedef std::shared_ptr<BlockingQueue<std::pair<Schema, std::string>>> SharedTabletQueue;
+
+// A set of callbacks which records the result of a tablet replica's checksum,
+// and then checks if the tablet server has any more tablets to checksum. If so,
+// a new async checksum scan is started.
+class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks {
+ public:
+ TabletServerChecksumCallbacks(
+ scoped_refptr<ChecksumResultReporter> reporter,
+ std::shared_ptr<KsckTabletServer> tablet_server,
+ SharedTabletQueue queue,
+ std::string tablet_id,
+ KsckChecksumOptions options);
+
+ void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override;
+
+ void Finished(const Status& status, uint64_t checksum) override;
+
+ private:
+ const scoped_refptr<ChecksumResultReporter> reporter_;
+ const std::shared_ptr<KsckTabletServer> tablet_server_;
+ const SharedTabletQueue queue_;
+ const KsckChecksumOptions options_;
+
+ std::string tablet_id_;
+};
+} // namespace tools
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_remote-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc
index 456544f..f41298c 100644
--- a/src/kudu/tools/ksck_remote-test.cc
+++ b/src/kudu/tools/ksck_remote-test.cc
@@ -43,6 +43,7 @@
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tools/data_gen_util.h"
#include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_checksum.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/atomic.h"
#include "kudu/util/countdown_latch.h"
@@ -318,7 +319,7 @@ TEST_F(RemoteKsckTest, TestChecksum) {
ASSERT_OK(ksck_->FetchInfoFromTabletServers());
err_stream_.str("");
- s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
+ s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
if (s.ok()) {
// Check the status message at the end of the checksum.
// We expect '0B from disk' because we didn't write enough data to trigger a flush
@@ -342,7 +343,7 @@ TEST_F(RemoteKsckTest, TestChecksumTimeout) {
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
ASSERT_OK(ksck_->FetchInfoFromTabletServers());
// Use an impossibly low timeout value of zero!
- Status s = ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false, 0));
+ Status s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false, 0));
ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString();
}
@@ -365,7 +366,7 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshot) {
ASSERT_OK(ksck_->CheckClusterRunning());
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
ASSERT_OK(ksck_->FetchInfoFromTabletServers());
- ASSERT_OK(ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(10), 16, true, ts)));
+ ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), 16, true, ts)));
continue_writing.Store(false);
ASSERT_OK(promise.Get());
writer_thread->Join();
@@ -391,8 +392,8 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshotCurrentTimestamp) {
ASSERT_OK(ksck_->CheckClusterRunning());
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
ASSERT_OK(ksck_->FetchInfoFromTabletServers());
- ASSERT_OK(ksck_->ChecksumData(ChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
- ChecksumOptions::kCurrentTimestamp)));
+ ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
+ KsckChecksumOptions::kCurrentTimestamp)));
continue_writing.Store(false);
ASSERT_OK(promise.Get());
writer_thread->Join();
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_remote.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 98b2b0a..c44922b 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -52,6 +52,7 @@
#include "kudu/server/server_base.proxy.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_checksum.h"
#include "kudu/tools/ksck_results.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tserver.pb.h"
@@ -295,7 +296,7 @@ class ChecksumCallbackHandler {
class ChecksumStepper {
public:
ChecksumStepper(string tablet_id, const Schema& schema, string server_uuid,
- ChecksumOptions options, ChecksumProgressCallbacks* callbacks,
+ KsckChecksumOptions options, KsckChecksumProgressCallbacks* callbacks,
shared_ptr<tserver::TabletServerServiceProxy> proxy)
: schema_(schema),
tablet_id_(std::move(tablet_id)),
@@ -396,8 +397,8 @@ class ChecksumStepper {
const string tablet_id_;
const string server_uuid_;
- const ChecksumOptions options_;
- ChecksumProgressCallbacks* const callbacks_;
+ const KsckChecksumOptions options_;
+ KsckChecksumProgressCallbacks* const callbacks_;
const shared_ptr<tserver::TabletServerServiceProxy> proxy_;
uint32_t call_seq_id_;
@@ -416,8 +417,8 @@ void ChecksumCallbackHandler::Run() {
void RemoteKsckTabletServer::RunTabletChecksumScanAsync(
const string& tablet_id,
const Schema& schema,
- const ChecksumOptions& options,
- ChecksumProgressCallbacks* callbacks) {
+ const KsckChecksumOptions& options,
+ KsckChecksumProgressCallbacks* callbacks) {
gscoped_ptr<ChecksumStepper> stepper(
new ChecksumStepper(tablet_id, schema, uuid(), options, callbacks, ts_proxy_));
stepper->Start();
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f5f57f5/src/kudu/tools/ksck_remote.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 07bb0b7..54d0f1d 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -54,7 +54,9 @@ class TabletServerServiceProxy;
namespace tools {
+class KsckChecksumProgressCallbacks;
enum class KsckServerHealth;
+struct KsckChecksumOptions;
// This implementation connects to a master via RPC.
class RemoteKsckMaster : public KsckMaster {
@@ -106,8 +108,8 @@ class RemoteKsckTabletServer : public KsckTabletServer {
void RunTabletChecksumScanAsync(
const std::string& tablet_id,
const Schema& schema,
- const ChecksumOptions& options,
- ChecksumProgressCallbacks* callbacks) override;
+ const KsckChecksumOptions& options,
+ KsckChecksumProgressCallbacks* callbacks) override;
virtual std::string address() const override {
return host_port_.ToString();