You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/10/03 21:40:54 UTC

[1/3] kudu git commit: [tools] ksck checksums: Add KsckChecksummer class

Repository: kudu
Updated Branches:
  refs/heads/master ec654c49f -> da28d0aee


[tools] ksck checksums: Add KsckChecksummer class

This removes the remaining checksum logic out of Ksck and into a new
KsckChecksummer class, along with refactoring some parts of the logic
into separate functions. There are no functional changes.

The logic that was not refactored is directly related to KUDU-2179, and
will be addressed in a follow-up patch.

Change-Id: I2016936eaa26fd6b499783e7d5d8f404816b37fa
Reviewed-on: http://gerrit.cloudera.org:8080/11498
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/63e4a0a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/63e4a0a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/63e4a0a7

Branch: refs/heads/master
Commit: 63e4a0a700eeb2677f232bc472b36769be3e0382
Parents: ec654c4
Author: Will Berkeley <wd...@gmail.org>
Authored: Fri Sep 21 16:45:35 2018 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Wed Oct 3 18:04:36 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck.cc          | 191 ++-----------------------
 src/kudu/tools/ksck.h           |   6 +-
 src/kudu/tools/ksck_checksum.cc | 263 ++++++++++++++++++++++++++++++++++-
 src/kudu/tools/ksck_checksum.h  |  65 +++++++++
 4 files changed, 337 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index a5ed001..4597668 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -35,7 +35,6 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet.pb.h"
@@ -43,9 +42,7 @@
 #include "kudu/tools/ksck_checksum.h"
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/atomic.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/locks.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/threadpool.h"
 
 #define PUSH_PREPEND_NOT_OK(s, statuses, msg) do { \
@@ -421,8 +418,13 @@ Status Ksck::Run() {
                       "table consistency check error");
 
   if (FLAGS_checksum_scan) {
-    PUSH_PREPEND_NOT_OK(ChecksumData(KsckChecksumOptions()),
-                        results_.error_messages, "checksum scan error");
+    // Copy the filters because they are passed by-value.
+    auto table_filters_for_checksum_opts = table_filters_;
+    auto tablet_id_filters_for_checksum_opts = tablet_id_filters_;
+    PUSH_PREPEND_NOT_OK(
+        ChecksumData(KsckChecksumOptions(std::move(table_filters_for_checksum_opts),
+                                         std::move(tablet_id_filters_for_checksum_opts))),
+        results_.error_messages, "checksum scan error");
   }
 
   // Use a special-case error if there are auth errors. This makes it harder
@@ -531,180 +533,13 @@ Status Ksck::CheckTablesConsistency() {
 }
 
 Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
-  // Copy options so that local modifications can be made and passed on.
-  KsckChecksumOptions options = opts;
-
-  typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>> TabletTableMap;
-  TabletTableMap tablet_table_map;
-
-  int num_tables = 0;
-  int num_tablets = 0;
-  int num_tablet_replicas = 0;
-  for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
-    VLOG(1) << "Table: " << table->name();
-    if (!MatchesAnyPattern(table_filters_, table->name())) continue;
-    num_tables += 1;
-    num_tablets += table->tablets().size();
-    for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
-      VLOG(1) << "Tablet: " << tablet->id();
-      if (!MatchesAnyPattern(tablet_id_filters_, tablet->id())) continue;
-      InsertOrDie(&tablet_table_map, tablet, table);
-      num_tablet_replicas += tablet->replicas().size();
-    }
-  }
-
-  if (num_tables == 0) {
-    string msg = "No table found.";
-    if (!table_filters_.empty()) {
-      msg += " Filter: table_filters=" + JoinStrings(table_filters_, ",");
-    }
-    return Status::NotFound(msg);
-  }
-
-  if (num_tablets > 0 && num_tablet_replicas == 0) {
-    // Warn if the table has tablets, but no replicas. The table may have no
-    // tablets if all range partitions have been dropped.
-    string msg = "No tablet replicas found.";
-    if (!table_filters_.empty() || !tablet_id_filters_.empty()) {
-      msg += " Filter: ";
-      if (!table_filters_.empty()) {
-        msg += "table_filters=" + JoinStrings(table_filters_, ",");
-      }
-      if (!tablet_id_filters_.empty()) {
-        msg += "tablet_id_filters=" + JoinStrings(tablet_id_filters_, ",");
-      }
-    }
-    return Status::NotFound(msg);
-  }
-
-  // Map of tablet servers to tablet queue.
-  typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
-
-  TabletServerQueueMap tablet_server_queues;
-  scoped_refptr<ChecksumResultReporter> reporter(new ChecksumResultReporter(num_tablet_replicas));
-
-  // Create a queue of checksum callbacks grouped by the tablet server.
-  for (const TabletTableMap::value_type& entry : tablet_table_map) {
-    const shared_ptr<KsckTablet>& tablet = entry.first;
-    const shared_ptr<KsckTable>& table = entry.second;
-    for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) {
-      const shared_ptr<KsckTabletServer>& ts =
-          FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
-
-      const SharedTabletQueue& queue =
-          LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_tablet_replicas);
-      CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
-    }
-  }
-
-  if (options.use_snapshot &&
-      options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
-    // Set the snapshot timestamp to the current timestamp of the first healthy tablet server
-    // we can find.
-    for (const auto& ts : tablet_server_queues) {
-      if (ts.first->is_healthy()) {
-        options.snapshot_timestamp = ts.first->current_timestamp();
-        break;
-      }
-    }
-    if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
-      return Status::ServiceUnavailable(
-          "No tablet servers were available to fetch the current timestamp");
-    }
-    results_.checksum_results.snapshot_timestamp = options.snapshot_timestamp;
-  }
-
-  // Kick off checksum scans in parallel. For each tablet server, we start
-  // scan_concurrency scans. Each callback then initiates one additional
-  // scan when it returns if the queue for that TS is not empty.
-  for (const TabletServerQueueMap::value_type& entry : tablet_server_queues) {
-    const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
-    const SharedTabletQueue& queue = entry.second;
-    queue->Shutdown(); // Ensures that BlockingGet() will not block.
-    for (int i = 0; i < options.scan_concurrency; i++) {
-      std::pair<Schema, std::string> table_tablet;
-      if (queue->BlockingGet(&table_tablet)) {
-        const Schema& table_schema = table_tablet.first;
-        const std::string& tablet_id = table_tablet.second;
-        auto* cbs = new TabletServerChecksumCallbacks(
-            reporter, tablet_server, queue, tablet_id, options);
-        // 'cbs' deletes itself when complete.
-        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
-      }
-    }
-  }
-
-  // Don't ruin JSON output by printing progress updates.
+  KsckChecksummer checksummer(cluster_.get());
+  auto* checksum_results = &results_.checksum_results;
+  // Don't ruin JSON output with ad hoc progress updates.
   auto* out_for_progress_updates = IsNonJSONFormat() ? out_ : nullptr;
-  bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
-
-  // Even if we timed out, for printing collate the checksum results that we did get.
-  ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();
-
-  int num_errors = 0;
-  int num_mismatches = 0;
-  int num_results = 0;
-  KsckTableChecksumMap checksum_tables;
-  for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
-    KsckTableChecksum table_checksum;
-    for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
-      if (ContainsKey(checksums, tablet->id())) {
-        KsckTabletChecksum tablet_checksum;
-        tablet_checksum.tablet_id = tablet->id();
-        bool seen_first_replica = false;
-        uint64_t first_checksum = 0;
-
-        for (const auto& r : FindOrDie(checksums, tablet->id())) {
-          KsckReplicaChecksum replica_checksum;
-          const string& replica_uuid = r.first;
-          shared_ptr<KsckTabletServer> ts = FindOrDie(cluster_->tablet_servers(), replica_uuid);
-          replica_checksum.ts_uuid = ts->uuid();
-          replica_checksum.ts_address = ts->address();
-
-          const ChecksumResultReporter::ResultPair& result = r.second;
-          const Status& status = result.first;
-          replica_checksum.checksum = result.second;
-          replica_checksum.status = status;
-          if (!status.ok()) {
-            num_errors++;
-          } else if (!seen_first_replica) {
-            seen_first_replica = true;
-            first_checksum = replica_checksum.checksum;
-          } else if (replica_checksum.checksum != first_checksum) {
-            num_mismatches++;
-            tablet_checksum.mismatch = true;
-          }
-          num_results++;
-          InsertOrDie(&tablet_checksum.replica_checksums,
-                      replica_checksum.ts_uuid,
-                      std::move(replica_checksum));
-        }
-        InsertOrDie(&table_checksum,
-                    tablet_checksum.tablet_id,
-                    std::move(tablet_checksum));
-      }
-    }
-    InsertOrDie(&checksum_tables, table->name(), std::move(table_checksum));
-  }
-  results_.checksum_results.tables.swap(checksum_tables);
-  if (timed_out) {
-    return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: "
-                                       "Received results for $1 out of $2 expected replicas",
-                                       options.timeout.ToString(), num_results,
-                                       num_tablet_replicas));
-  }
-  CHECK_EQ(num_results, num_tablet_replicas)
-      << Substitute("Unexpected error: only got $0 out of $1 replica results",
-                    num_results, num_tablet_replicas);
-
-  if (num_mismatches != 0) {
-    return Status::Corruption(Substitute("$0 checksum mismatches were detected.", num_mismatches));
-  }
-  if (num_errors != 0) {
-    return Status::Aborted(Substitute("$0 errors were detected", num_errors));
-  }
-
-  return Status::OK();
+  return checksummer.ChecksumData(opts,
+                                  checksum_results,
+                                  out_for_progress_updates);
 }
 
 bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 8e5e0b2..c654be0 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -412,15 +412,15 @@ class KsckCluster {
   // The table's tablet list is modified only if this method returns OK.
   virtual Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table) = 0;
 
-  const MasterList& masters() {
+  const MasterList& masters() const {
     return masters_;
   }
 
-  const TSMap& tablet_servers() {
+  const TSMap& tablet_servers() const {
     return tablet_servers_;
   }
 
-  const std::vector<std::shared_ptr<KsckTable>>& tables() {
+  const std::vector<std::shared_ptr<KsckTable>>& tables() const {
     return tables_;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck_checksum.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
index 9d2b289..70d7c47 100644
--- a/src/kudu/tools/ksck_checksum.cc
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -20,22 +20,30 @@
 #include <algorithm>
 #include <cstdint>
 #include <iostream>
+#include <map>
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
+#include <glog/logging.h>
 
 #include "kudu/common/schema.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck.h"
+#include "kudu/tools/tool_action_common.h"
 
 using std::endl;
+using std::ostream;
 using std::shared_ptr;
 using std::string;
 using std::unordered_map;
+using std::vector;
 using strings::Substitute;
 
 DEFINE_int32(checksum_timeout_sec, 3600,
@@ -53,17 +61,40 @@ namespace kudu {
 namespace tools {
 
 KsckChecksumOptions::KsckChecksumOptions()
-    : timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
-      scan_concurrency(FLAGS_checksum_scan_concurrency),
-      use_snapshot(FLAGS_checksum_snapshot),
-      snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {}
+    : KsckChecksumOptions({}, {}) {}
 
-KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout, int scan_concurrency,
-                                         bool use_snapshot, uint64_t snapshot_timestamp)
+KsckChecksumOptions::KsckChecksumOptions(vector<string> table_filters,
+                                         vector<string> tablet_id_filters)
+    : KsckChecksumOptions(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec),
+                          FLAGS_checksum_scan_concurrency,
+                          FLAGS_checksum_snapshot,
+                          FLAGS_checksum_snapshot_timestamp,
+                          std::move(table_filters),
+                          std::move(tablet_id_filters)) {}
+
+KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
+                                         int scan_concurrency,
+                                         bool use_snapshot,
+                                         uint64_t snapshot_timestamp)
+    : KsckChecksumOptions(timeout,
+                          scan_concurrency,
+                          use_snapshot,
+                          snapshot_timestamp,
+                          {},
+                          {}) {}
+
+KsckChecksumOptions::KsckChecksumOptions(MonoDelta timeout,
+                                         int scan_concurrency,
+                                         bool use_snapshot,
+                                         uint64_t snapshot_timestamp,
+                                         vector<string> table_filters,
+                                         vector<string> tablet_id_filters)
     : timeout(timeout),
       scan_concurrency(scan_concurrency),
       use_snapshot(use_snapshot),
-      snapshot_timestamp(snapshot_timestamp) {}
+      snapshot_timestamp(snapshot_timestamp),
+      table_filters(std::move(table_filters)),
+      tablet_id_filters(std::move(tablet_id_filters)) {}
 
 ChecksumResultReporter::ChecksumResultReporter(int num_tablet_replicas)
     : expected_count_(num_tablet_replicas),
@@ -153,5 +184,223 @@ void TabletServerChecksumCallbacks::Finished(const Status& status, uint64_t chec
   }
 }
 
+KsckChecksummer::KsckChecksummer(KsckCluster* cluster)
+    : cluster_(CHECK_NOTNULL(cluster)) {}
+
+Status KsckChecksummer::BuildTabletTableMap(
+    const KsckChecksumOptions& opts,
+    KsckChecksummer::TabletTableMap* tablet_table_map,
+    int* num_replicas) const {
+  CHECK(tablet_table_map);
+  CHECK(num_replicas);
+
+  TabletTableMap tablet_table_map_tmp;
+  int num_tables = 0;
+  int num_tablets = 0;
+  int num_replicas_tmp = 0;
+  for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
+    VLOG(1) << "Table: " << table->name();
+    if (!MatchesAnyPattern(opts.table_filters, table->name())) continue;
+    num_tables += 1;
+    num_tablets += table->tablets().size();
+    for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
+      VLOG(1) << "Tablet: " << tablet->id();
+      if (!MatchesAnyPattern(opts.tablet_id_filters, tablet->id())) continue;
+      InsertOrDie(&tablet_table_map_tmp, tablet, table);
+      num_replicas_tmp += tablet->replicas().size();
+    }
+  }
+
+  if (num_tables == 0) {
+    string msg = "No table found.";
+    if (!opts.table_filters.empty()) {
+      msg += " Filter: table_filters=" + JoinStrings(opts.table_filters, ",");
+    }
+    return Status::NotFound(msg);
+  }
+
+  if (num_tablets > 0 && num_replicas_tmp == 0) {
+    // Warn if the table has tablets, but no replicas. The table may have no
+    // tablets if all range partitions have been dropped.
+    string msg = "No tablet replicas found.";
+    if (!opts.table_filters.empty() || !opts.tablet_id_filters.empty()) {
+      msg += " Filter:";
+      if (!opts.table_filters.empty()) {
+        msg += " table_filters=" + JoinStrings(opts.table_filters, ",");
+      }
+      if (!opts.tablet_id_filters.empty()) {
+        msg += " tablet_id_filters=" + JoinStrings(opts.tablet_id_filters, ",");
+      }
+    }
+    return Status::NotFound(msg);
+  }
+
+  *tablet_table_map = std::move(tablet_table_map_tmp);
+  *num_replicas = num_replicas_tmp;
+  return Status::OK();
+}
+
+Status KsckChecksummer::CollateChecksumResults(
+    const ChecksumResultReporter::TabletResultMap& checksums,
+    KsckTableChecksumMap* table_checksum_map,
+    int* num_results) const {
+  CHECK(table_checksum_map);
+  CHECK(num_results);
+
+  table_checksum_map->clear();
+  *num_results = 0;
+  int num_errors = 0;
+  int num_mismatches = 0;
+  for (const auto& table : cluster_->tables()) {
+    KsckTableChecksum table_checksum;
+    for (const auto& tablet : table->tablets()) {
+      if (ContainsKey(checksums, tablet->id())) {
+        KsckTabletChecksum tablet_checksum;
+        tablet_checksum.tablet_id = tablet->id();
+        bool seen_first_replica = false;
+        uint64_t first_checksum = 0;
+
+        for (const auto& r : FindOrDie(checksums, tablet->id())) {
+          KsckReplicaChecksum replica_checksum;
+          const auto& replica_uuid = r.first;
+          const auto& ts = FindOrDie(cluster_->tablet_servers(), replica_uuid);
+          replica_checksum.ts_uuid = ts->uuid();
+          replica_checksum.ts_address = ts->address();
+
+          const ChecksumResultReporter::ResultPair& result = r.second;
+          const Status& status = result.first;
+          replica_checksum.checksum = result.second;
+          replica_checksum.status = status;
+          if (!status.ok()) {
+            num_errors++;
+          } else if (!seen_first_replica) {
+            seen_first_replica = true;
+            first_checksum = replica_checksum.checksum;
+          } else if (replica_checksum.checksum != first_checksum) {
+            num_mismatches++;
+            tablet_checksum.mismatch = true;
+          }
+          (*num_results)++;
+          EmplaceOrDie(&tablet_checksum.replica_checksums,
+                       replica_checksum.ts_uuid,
+                       std::move(replica_checksum));
+        }
+        EmplaceOrDie(&table_checksum,
+                     tablet_checksum.tablet_id,
+                     std::move(tablet_checksum));
+      }
+    }
+    EmplaceOrDie(table_checksum_map, table->name(), std::move(table_checksum));
+  }
+
+  if (num_mismatches != 0) {
+    return Status::Corruption(Substitute("$0 checksum mismatches were detected.",
+                                         num_mismatches));
+  }
+  if (num_errors != 0) {
+    return Status::Aborted(Substitute("$0 errors were detected", num_errors));
+  }
+  return Status::OK();
+}
+
+Status KsckChecksummer::ChecksumData(const KsckChecksumOptions& opts,
+                                     KsckChecksumResults* checksum_results,
+                                     ostream* out_for_progress_updates) {
+  CHECK(checksum_results);
+
+  // Make a copy of 'opts' because we may need to assign a snapshot timestamp
+  // if one was not provided.
+  KsckChecksumOptions options = opts;
+
+  // Clear the contents of 'checksum_results' because we always overwrite it
+  // with whatever results are obtained (and with nothing if there's no results).
+  checksum_results->snapshot_timestamp = boost::none;
+  checksum_results->tables.clear();
+
+  TabletTableMap tablet_table_map;
+  int num_replicas;
+  RETURN_NOT_OK(BuildTabletTableMap(options, &tablet_table_map, &num_replicas));
+
+  // Map of tablet servers to tablet queue.
+  typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
+
+  TabletServerQueueMap tablet_server_queues;
+  scoped_refptr<ChecksumResultReporter> reporter(
+      new ChecksumResultReporter(num_replicas));
+
+  // Create a queue of checksum callbacks grouped by the tablet server.
+  for (const auto& entry : tablet_table_map) {
+    const shared_ptr<KsckTablet>& tablet = entry.first;
+    const shared_ptr<KsckTable>& table = entry.second;
+    for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) {
+      const shared_ptr<KsckTabletServer>& ts =
+          FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
+
+      const SharedTabletQueue& queue =
+          LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_replicas);
+      CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
+    }
+  }
+
+  // Set the snapshot timestamp. If the sentinel value 'kCurrentTimestamp' was
+  // provided, the snapshot timestamp is set to the current timestamp of some
+  // healthy tablet server.
+  if (options.use_snapshot &&
+      options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
+    for (const auto& ts : tablet_server_queues) {
+      if (ts.first->is_healthy()) {
+        options.snapshot_timestamp = ts.first->current_timestamp();
+        break;
+      }
+    }
+    if (options.snapshot_timestamp == KsckChecksumOptions::kCurrentTimestamp) {
+      return Status::ServiceUnavailable(
+          "No tablet servers were available to fetch the current timestamp");
+    }
+    checksum_results->snapshot_timestamp = options.snapshot_timestamp;
+  }
+
+  // Kick off checksum scans in parallel. For each tablet server, we start
+  // 'options.scan_concurrency' scans. Each callback then initiates one
+  // additional scan when it returns if the queue for that TS is not empty.
+  for (const auto& entry : tablet_server_queues) {
+    const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
+    const SharedTabletQueue& queue = entry.second;
+    queue->Shutdown(); // Ensures that BlockingGet() will not block.
+    for (int i = 0; i < options.scan_concurrency; i++) {
+      std::pair<Schema, std::string> table_tablet;
+      if (queue->BlockingGet(&table_tablet)) {
+        const Schema& table_schema = table_tablet.first;
+        const std::string& tablet_id = table_tablet.second;
+        auto* cbs = new TabletServerChecksumCallbacks(
+            reporter, tablet_server, queue, tablet_id, options);
+        // 'cbs' deletes itself when complete.
+        tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
+      }
+    }
+  }
+
+  bool timed_out = !reporter->WaitFor(options.timeout, out_for_progress_updates);
+
+  // Even if we timed out, collate the checksum results that we did get.
+  KsckTableChecksumMap checksum_table_map;
+  int num_results;
+  const Status s = CollateChecksumResults(reporter->checksums(),
+                                          &checksum_table_map,
+                                          &num_results);
+  checksum_results->tables = std::move(checksum_table_map);
+
+  if (timed_out) {
+    return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: "
+                                       "Received results for $1 out of $2 expected replicas",
+                                       options.timeout.ToString(), num_results,
+                                       num_replicas));
+  }
+  CHECK_EQ(num_results, num_replicas)
+      << Substitute("Unexpected error: only got $0 out of $1 replica results",
+                    num_results, num_replicas);
+  return s;
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/63e4a0a7/src/kudu/tools/ksck_checksum.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck_checksum.h b/src/kudu/tools/ksck_checksum.h
index 7285f92..8fbb2f1 100644
--- a/src/kudu/tools/ksck_checksum.h
+++ b/src/kudu/tools/ksck_checksum.h
@@ -24,8 +24,11 @@
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/tools/ksck_results.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/countdown_latch.h"
@@ -39,6 +42,9 @@ class Schema;
 
 namespace tools {
 
+class KsckCluster;
+class KsckTable;
+class KsckTablet;
 class KsckTabletServer;
 
 // Options for checksum scans.
@@ -49,11 +55,21 @@ struct KsckChecksumOptions {
 
   KsckChecksumOptions();
 
+  KsckChecksumOptions(std::vector<std::string> table_filters,
+                      std::vector<std::string> tablet_id_filters);
+
   KsckChecksumOptions(MonoDelta timeout,
                       int scan_concurrency,
                       bool use_snapshot,
                       uint64_t snapshot_timestamp);
 
+  KsckChecksumOptions(MonoDelta timeout,
+                      int scan_concurrency,
+                      bool use_snapshot,
+                      uint64_t snapshot_timestamp,
+                      std::vector<std::string> table_filters,
+                      std::vector<std::string> tablet_id_filters);
+
   // The maximum total time to wait for results to come back from all replicas.
   MonoDelta timeout;
 
@@ -65,6 +81,11 @@ struct KsckChecksumOptions {
 
   // The snapshot timestamp to use for snapshot checksum scans.
   uint64_t snapshot_timestamp;
+
+  // Filters for the table names and tablet ids whose contents should be
+  // checksummed.
+  std::vector<std::string> table_filters;
+  std::vector<std::string> tablet_id_filters;
 };
 
 // Interface for reporting progress on checksumming a single
@@ -152,6 +173,8 @@ class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks {
   void Finished(const Status& status, uint64_t checksum) override;
 
  private:
+  ~TabletServerChecksumCallbacks() = default;
+
   const scoped_refptr<ChecksumResultReporter> reporter_;
   const std::shared_ptr<KsckTabletServer> tablet_server_;
   const SharedTabletQueue queue_;
@@ -159,5 +182,47 @@ class TabletServerChecksumCallbacks : public KsckChecksumProgressCallbacks {
 
   std::string tablet_id_;
 };
+
+// A class for performing checksum scans against a Kudu cluster.
+class KsckChecksummer {
+ public:
+   // 'cluster' must remain valid as long as this instance is alive.
+  explicit KsckChecksummer(KsckCluster* cluster);
+
+  // Checksum the data in the Kudu cluster according to the options provided in
+  // 'opts'. Results will be populated in the 'checksum_results'. If non-null,
+  // progress updates will be written to 'out_for_progress_updates'.
+  // NOTE: Even if this method returns a bad Status, 'checksum_results' will be
+  // populated with whatever checksum results were received.
+  Status ChecksumData(const KsckChecksumOptions& opts,
+                      KsckChecksumResults* checksum_results,
+                      std::ostream* out_for_progress_updates);
+
+ private:
+  typedef std::unordered_map<std::shared_ptr<KsckTablet>,
+                             std::shared_ptr<KsckTable>> TabletTableMap;
+
+  // Builds a mapping from tablet-to-be-checksummed to its table, which is
+  // used to create checksum callbacks. This mapping is returned in
+  // 'tablet_table_map' and the total number of replicas to be checksummed is
+  // returned in 'num_replicas'.
+  Status BuildTabletTableMap(const KsckChecksumOptions& opts,
+                             TabletTableMap* tablet_table_map,
+                             int* num_replicas) const;
+
+  // Collates the results of checksums into 'table_checksum_map', with the
+  // total number of results returned as 'num_results'.
+  // NOTE: Even if this function returns a bad Status, 'table_checksum_map'
+  // and 'num_results' will still be populated using whatever results are
+  // available.
+  Status CollateChecksumResults(
+      const ChecksumResultReporter::TabletResultMap& checksums,
+      KsckTableChecksumMap* table_checksum_map,
+      int* num_results) const;
+
+  KsckCluster* cluster_;
+
+  DISALLOW_COPY_AND_ASSIGN(KsckChecksummer);
+};
 } // namespace tools
 } // namespace kudu


[3/3] kudu git commit: Build cleanup in preperation for release

Posted by gr...@apache.org.
Build cleanup in preperation for release

This patch is a few small fixes in preparation for release:

- Updates the releasing docs to use Gradle
- Updates the hive and backup modules to skip the
  javadoc task. They don’t have public javadoc to publish.
- Updates the backup module to ensure it’s not published.
- Fixes Gradle pom generation to included unshaded deps.
- Fixes the Gradle signing logic.

This work was validated by walking through the release steps
for both Maven and Gradle. I staged the artifacts from both
builds and compared them.

Change-Id: I50c3b9358896b264fd525de558b6be7ef77145e2
Reviewed-on: http://gerrit.cloudera.org:8080/11576
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <ab...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/da28d0ae
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/da28d0ae
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/da28d0ae

Branch: refs/heads/master
Commit: da28d0aee42445791183d0ff5bc021c347cf5f9d
Parents: ce2b527
Author: Grant Henke <gr...@apache.org>
Authored: Wed Oct 3 12:17:14 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Wed Oct 3 21:24:15 2018 +0000

----------------------------------------------------------------------
 RELEASING.adoc                | 32 +++-----------------------------
 java/gradle/publishing.gradle |  4 ++--
 java/gradle/shadow.gradle     | 14 +++++++++++++-
 java/kudu-backup/build.gradle |  8 ++++++++
 java/kudu-backup/pom.xml      | 22 +++++++++++++++++++---
 java/kudu-hive/pom.xml        |  8 ++++++++
 6 files changed, 53 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/da28d0ae/RELEASING.adoc
----------------------------------------------------------------------
diff --git a/RELEASING.adoc b/RELEASING.adoc
index 7eaa641..4f68b75 100644
--- a/RELEASING.adoc
+++ b/RELEASING.adoc
@@ -117,40 +117,14 @@ Apache committer guide for setting up your GPG keys
 
 . Fix any issues it finds, such as RAT.
 
-. Add the following information to your `~/.m2/settings.xml` file in order to
-  be able to deploy artifacts to the ASF Maven repository:
-+
-----
-<settings>
-  <servers>
-    <server>
-      <id>apache.snapshots.https</id>
-      <username> <!-- YOUR APACHE LDAP USERNAME --> </username>
-      <password> <!-- YOUR APACHE LDAP PASSWORD (encrypted) --> </password>
-    </server>
-    <!-- To stage a release of some part of Maven -->
-    <server>
-      <id>apache.releases.https</id>
-      <username> <!-- YOUR APACHE LDAP USERNAME --> </username>
-      <password> <!-- YOUR APACHE LDAP PASSWORD (encrypted) --> </password>
-    </server>
-  </servers>
-</settings>
-----
-+
-If you don't want to keep your ASF password in plaintext on your local machine,
-you can link:http://maven.apache.org/guides/mini/guide-encryption.html[encrypt it].
-
 . Test the full Java build. This will sign and build everything without
   deploying any artifacts:
 +
 ----
-  # Run a gpg-agent if you don't normally. You may have to tweak it to get it
-  # to work with Maven, and this StackOverflow article might help:
-  # https://stackoverflow.com/questions/36506275/why-do-i-have-to-kill-gpg-agent-to-sign-my-commits
+  # Run a gpg-agent if you don't normally.
   gpg-agent --daemon
   cd java
-  mvn -DskipTests -Papache-release clean install
+  gradle clean install -PforceSigning
 ----
 +
 
@@ -184,7 +158,7 @@ you can link:http://maven.apache.org/guides/mini/guide-encryption.html[encrypt i
   # Run a gpg-agent if you don't normally
   gpg-agent --daemon
   cd java
-  mvn -DskipTests -Papache-release clean deploy
+  gradle clean uploadArchives -PmavenUsername="<APACHE-LDAP-USERNAME>" -PmavenPassword="<APACHE-LDAP-PASSWORD>"
 ----
 +
 Go to the link:https://repository.apache.org/\#stagingRepositories[staging repository]

http://git-wip-us.apache.org/repos/asf/kudu/blob/da28d0ae/java/gradle/publishing.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/publishing.gradle b/java/gradle/publishing.gradle
index f2ad3eb..9ed2d52 100644
--- a/java/gradle/publishing.gradle
+++ b/java/gradle/publishing.gradle
@@ -30,8 +30,8 @@ ext {
           // Skip signing artifacts by default if -PskipSigning is passed.
           (!skipSigning
               // Sign artifacts if the version is not a snapshot, and we are uploading them to maven.
-              && !version.endsWith("SNAPSHOT") && gradle.taskGraph.hasTask("uploadArchives"))
-
+              && !version.endsWith("SNAPSHOT")
+              && project.gradle.startParameter.taskNames.any { it.contains("upload") })
   // These properties can be set in ~/.gradle/gradle.properties file,
   // though it would be open text. They can also be set on the cli via
   // -PmavenUsername and -PmavenPassword.

http://git-wip-us.apache.org/repos/asf/kudu/blob/da28d0ae/java/gradle/shadow.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle
index 95a9b04..4a5d8ca 100644
--- a/java/gradle/shadow.gradle
+++ b/java/gradle/shadow.gradle
@@ -75,10 +75,22 @@ configurations.create("compileUnshaded")
 configurations.shadow.extendsFrom(configurations.compileUnshaded)
 configurations.compile.extendsFrom(configurations.compileUnshaded)
 
-// Ensure compileUnshaded dependencies are not compiled into shadowJar.
 // We use afterEvaluate to add additional configuration once all the definitions
 // in the projects build script have been applied
 afterEvaluate {
+  // Ensure compileUnshaded dependencies are included in the pom.
+  [install, uploadArchives].each { task ->
+    task.repositories.each {
+      configure(it.pom.scopeMappings) {
+        // The priority value is arbitrary.
+        addMapping(
+            MavenPlugin.COMPILE_PRIORITY,
+            configurations.compileUnshaded,
+            Conf2ScopeMappingContainer.COMPILE)
+      }
+    }
+  }
+  // Ensure compileUnshaded dependencies are not compiled into shadowJar.
   project.configurations.compileUnshaded.dependencies.each { dep ->
     def depStr = "${dep.group}:${dep.name}:${dep.version}"
     logger.info "Excluding ${depStr} from being bundled into the shaded jar."

http://git-wip-us.apache.org/repos/asf/kudu/blob/da28d0ae/java/kudu-backup/build.gradle
----------------------------------------------------------------------
diff --git a/java/kudu-backup/build.gradle b/java/kudu-backup/build.gradle
index 5c009fe..6a63f75 100644
--- a/java/kudu-backup/build.gradle
+++ b/java/kudu-backup/build.gradle
@@ -59,3 +59,11 @@ sourceSets {
 
 // Adjust the artifact name to match the maven build.
 archivesBaseName = "kudu-backup${versions.sparkBase}_${versions.scalaBase}"
+
+// kudu-backup has no public Javadoc.
+javadoc {
+  enabled = false
+}
+
+// Skip publishing kudu-backup until it's ready to be supported long-term.
+uploadArchives.enabled = false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/da28d0ae/java/kudu-backup/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-backup/pom.xml b/java/kudu-backup/pom.xml
index 0cac985..891960c 100644
--- a/java/kudu-backup/pom.xml
+++ b/java/kudu-backup/pom.xml
@@ -269,13 +269,29 @@
             </plugin>
         </plugins>
 
-        <!-- This big ol' block of nonsense tells the m2e Eclipse plugin what
-            to do with maven plugins that don't have m2e "extensions" available.
 
-            It doesn't affect the Maven build at all. -->
         <pluginManagement>
             <plugins>
                 <plugin>
+                    <!-- kudu-backup has no public Javadoc. -->
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-javadoc-plugin</artifactId>
+                    <configuration>
+                        <skip>true</skip>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <!-- Skip publishing kudu-backup until it's ready to be supported long-term. -->
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-deploy-plugin</artifactId>
+                    <configuration>
+                        <skip>true</skip>
+                    </configuration>
+                </plugin>
+                <!-- This big ol' block of nonsense tells the m2e Eclipse plugin what
+                     to do with maven plugins that don't have m2e "extensions" available.
+                     It doesn't affect the Maven build at all. -->
+                <plugin>
                     <groupId>org.eclipse.m2e</groupId>
                     <artifactId>lifecycle-mapping</artifactId>
                     <version>1.0.0</version>

http://git-wip-us.apache.org/repos/asf/kudu/blob/da28d0ae/java/kudu-hive/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-hive/pom.xml b/java/kudu-hive/pom.xml
index a82f294..33d4de0 100644
--- a/java/kudu-hive/pom.xml
+++ b/java/kudu-hive/pom.xml
@@ -96,6 +96,14 @@
     <build>
         <plugins>
             <plugin>
+                <!-- kudu-hive has no public Javadoc. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+            <plugin>
                 <!-- Skip publishing kudu-hive until it's ready to be supported long-term. -->
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-deploy-plugin</artifactId>


[2/3] kudu git commit: [hms] enable 'notifications.add.thrift.objects' in mini HMS

Posted by gr...@apache.org.
[hms] enable 'notifications.add.thrift.objects' in mini HMS

This commits explictly enables 'notifications.add.thrift.objects' to
add the entire thrift Table/Partition objects to Hive notification log.
So that Kudu can get all the required fields to properly parse the
notification messages, when running with Hive distributions that
disable it by default.

Change-Id: I9f28f52bdddd59c5b0dc7120d62482de8becb1ee
Reviewed-on: http://gerrit.cloudera.org:8080/11566
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ce2b5277
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ce2b5277
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ce2b5277

Branch: refs/heads/master
Commit: ce2b52770be4a81800fb5eb155b952d5211e2017
Parents: 63e4a0a
Author: Hao Hao <ha...@cloudera.com>
Authored: Mon Oct 1 13:05:27 2018 -0700
Committer: Hao Hao <ha...@cloudera.com>
Committed: Wed Oct 3 18:24:32 2018 +0000

----------------------------------------------------------------------
 src/kudu/hms/hms_client.cc | 22 ++++++++++++++++++++--
 src/kudu/hms/hms_client.h  |  1 +
 src/kudu/hms/mini_hms.cc   |  9 +++++++++
 3 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ce2b5277/src/kudu/hms/hms_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index ef37616..9d51956 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -112,6 +112,8 @@ const char* const HmsClient::kStorageHandlerKey = "storage_handler";
 const char* const HmsClient::kKuduMetastorePlugin =
   "org.apache.kudu.hive.metastore.KuduMetastorePlugin";
 const char* const HmsClient::kHiveFilterFieldParams = "hive_filter_field_params__";
+const char* const HmsClient::kNotificationAddThriftObjects =
+  "hive.metastore.notifications.add.thrift.objects";
 
 const char* const HmsClient::kManagedTable = "MANAGED_TABLE";
 const char* const HmsClient::kExternalTable = "EXTERNAL_TABLE";
@@ -174,10 +176,26 @@ Status HmsClient::Start() {
 
   if (boost::iequals(disallow_incompatible_column_type_changes, "true")) {
     return Status::IllegalState(Substitute(
-          "Hive Metastore configuration is invalid: $0 must be set to false",
-          kDisallowIncompatibleColTypeChanges));
+        "Hive Metastore configuration is invalid: $0 must be set to false",
+        kDisallowIncompatibleColTypeChanges));
   }
 
+  // Check that the HMS is configured to add the entire thrift Table/Partition
+  // objects to the HMS notifications, which is required to properly parse the
+  // HMS notification log. This is specific to the HMS version shipped in
+  // Cloudera's CDH.
+  string thrift_objects_config;
+  HMS_RET_NOT_OK(client_.get_config_value(thrift_objects_config,
+                                          kNotificationAddThriftObjects,
+                                          "true"),
+                 Substitute("failed to get Hive Metastore $0 configuration",
+                            kNotificationAddThriftObjects));
+  if (boost::iequals(thrift_objects_config, "false")) {
+    return Status::IllegalState(Substitute(
+        "Hive Metastore configuration is invalid: $0 must be set to true",
+        kNotificationAddThriftObjects));
+    }
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce2b5277/src/kudu/hms/hms_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index c081366..23451f3 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -83,6 +83,7 @@ class HmsClient {
   static const char* const kDisallowIncompatibleColTypeChanges;
   static const char* const kDbNotificationListener;
   static const char* const kKuduMetastorePlugin;
+  static const char* const kNotificationAddThriftObjects;
 
   // See org.apache.hadoop.hive.metastore.TableType.
   static const char* const kManagedTable;

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce2b5277/src/kudu/hms/mini_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index 253de09..079cdb1 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -207,6 +207,10 @@ Status MiniHms::CreateHiveSite() const {
   // - hive.support.special.characters.tablename
   //     Configures the HMS to allow special characters such as '/' in table
   //     names.
+  //
+  // - hive.metastore.notifications.add.thrift.objects
+  //     Configured the HMS to add the entire thrift Table/Partition
+  //     objects to the HMS notifications.
   static const string kFileTemplate = R"(
 <configuration>
   <property>
@@ -271,6 +275,11 @@ Status MiniHms::CreateHiveSite() const {
     <name>hive.support.special.characters.tablename</name>
     <value>true</value>
   </property>
+
+  <property>
+    <name>hive.metastore.notifications.add.thrift.objects</name>
+    <value>true</value>
+  </property>
 </configuration>
   )";