You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/07/30 18:46:59 UTC
[3/4] kudu git commit: hms-tool: refactor check tool and combine
upgrade and fix
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e40dfdb/src/kudu/tools/tool_action_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_hms.cc b/src/kudu/tools/tool_action_hms.cc
index 6273ed2..db90ee3 100644
--- a/src/kudu/tools/tool_action_hms.cc
+++ b/src/kudu/tools/tool_action_hms.cc
@@ -19,6 +19,7 @@
#include <map>
#include <memory>
#include <string>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -32,21 +33,35 @@
#include "kudu/client/shared_ptr.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/hms/hms_catalog.h"
#include "kudu/hms/hms_client.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
#include "kudu/util/status.h"
-DECLARE_int64(timeout_ms); // defined in tool_action_common
+DECLARE_bool(force);
+DECLARE_int64(timeout_ms);
DECLARE_string(hive_metastore_uris);
+DEFINE_bool(dryrun, false,
+ "Print a message for each fix, but do not make modifications to Kudu or the Hive Metastore.");
+DEFINE_bool(drop_orphan_hms_tables, false,
+ "Drop orphan Hive Metastore tables which refer to non-existent Kudu tables.");
+DEFINE_bool(create_missing_hms_tables, true,
+ "Create a Hive Metastore table for each Kudu table which is missing one.");
+DEFINE_bool(fix_inconsistent_tables, true,
+ "Fix tables whose Kudu and Hive Metastore metadata differ. If the table name is "
+ "different, the table is renamed in Kudu to match the HMS. If the columns "
+ "or other metadata is different the HMS is updated to match Kudu.");
+DEFINE_bool(upgrade_hms_tables, true,
+ "Upgrade Hive Metastore tables from the legacy Impala metadata format to the "
+ "new Kudu metadata format.");
+
namespace kudu {
namespace tools {
@@ -55,11 +70,11 @@ using client::KuduClientBuilder;
using client::KuduTable;
using client::KuduTableAlterer;
using client::sp::shared_ptr;
-using hms::HmsClient;
using hms::HmsCatalog;
-using std::cin;
+using hms::HmsClient;
using std::cout;
using std::endl;
+using std::make_pair;
using std::ostream;
using std::pair;
using std::string;
@@ -69,143 +84,16 @@ using std::vector;
using strings::Split;
using strings::Substitute;
-DEFINE_bool(enable_input, true,
- "Whether to enable user input for renaming tables that have Hive"
- "incompatible names.");
-
-// The key is the table ID. The value is a pair of a table in Kudu and a
-// vector of tables in the HMS that all share the same table ID.
-typedef unordered_map<string, pair<shared_ptr<KuduTable>, vector<hive::Table>>> TablesMap;
-
-const char* const kDefaultDatabaseArg = "default_database";
-const char* const kDefaultDatabaseArgDesc = "The database that non-Impala Kudu "
- "table should reside in";
-const char* const kInvalidNameError = "is not a valid object name";
-
-// Returns a map that contains all Kudu tables in the HMS. Its key is
-// the Kudu table name and its value is the corresponding HMS table.
-unordered_map<string, hive::Table> RetrieveTablesMap(vector<hive::Table> hms_tables) {
- unordered_map<string, hive::Table> hms_tables_map;
- for (auto& hms_table : hms_tables) {
- const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
- if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
- hms_tables_map.emplace(hms_table.parameters[HmsClient::kLegacyKuduTableNameKey], hms_table);
- } else if (storage_handler == HmsClient::kKuduStorageHandler) {
- hms_tables_map.emplace(Substitute("$0.$1", hms_table.dbName, hms_table.tableName), hms_table);
- }
- }
- return hms_tables_map;
-}
-
-string RenameHiveIncompatibleTable(const string& table_name) {
- cout << Substitute("Table $0 is not hive compatible.", table_name) << endl;
- cout << "Please input a new table name: ";
- string new_table_name;
- getline(cin, new_table_name);
- return new_table_name;
-}
-
// Only alter the table in Kudu but not in the Hive Metastore.
-Status AlterKuduTableOnly(KuduClient* kudu_client,
- const string& name,
- const string& new_name) {
+Status RenameTableInKuduCatalog(KuduClient* kudu_client,
+ const string& name,
+ const string& new_name) {
unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name));
SetAlterExternalCatalogs(alterer.get(), false);
return alterer->RenameTo(new_name)
->Alter();
}
-// Alter legacy tables (which includes non-Impala tables, Impala managed/external
-// tables) to follow the format 'database_name.table_name' in table naming in Kudu.
-// Also, create HMS entries for non-Impala tables.
-//
-// Note that non-Impala tables name should conform to Hive naming standard.
-// Otherwise, the upgrade process will fail.
-Status AlterLegacyKuduTables(KuduClient* kudu_client,
- HmsCatalog* hms_catalog,
- const string& default_database,
- const vector<hive::Table>& hms_tables) {
- vector<string> table_names;
- RETURN_NOT_OK(kudu_client->ListTables(&table_names));
- unordered_map<string, hive::Table> hms_tables_map = RetrieveTablesMap(hms_tables);
-
- // Take care of the orphan tables in the HMS.
- for (const auto& hms_table : hms_tables_map) {
- bool exist;
- RETURN_NOT_OK(kudu_client->TableExists(hms_table.first, &exist));
- if (!exist) {
- // Warn instead of dropping the table in the HMS to avoid breakage for
- // installations which have multiple Kudu clusters pointed at the same HMS.
- LOG(WARNING) << Substitute("Found orphan table $0.$1 in the Hive Metastore",
- hms_table.second.dbName, hms_table.second.tableName);
- }
- }
-
- unordered_map<string, Status> failures;
- for (const auto& table_name : table_names) {
- hive::Table* hms_table = FindOrNull(hms_tables_map, table_name);
- shared_ptr<KuduTable> kudu_table;
- RETURN_NOT_OK(kudu_client->OpenTable(table_name, &kudu_table));
- Status s;
- if (hms_table) {
- // Rename legacy Impala managed tables and external tables to follow the
- // format 'database_name.table_name'. This is a no-op for non-legacy tables
- // stored in the HMS.
- if (hms_table->parameters[HmsClient::kStorageHandlerKey] ==
- HmsClient::kLegacyKuduStorageHandler) {
- string new_table_name = Substitute("$0.$1", hms_table->dbName, hms_table->tableName);
- // Hive-compatible table name implies the table has been upgraded previously and
- // then downgraded. In this case, we only upgrade the legacy Impala table.
- if (table_name != new_table_name) {
- s = AlterKuduTableOnly(kudu_client, table_name, new_table_name);
- if (s.IsAlreadyPresent()) {
- s = s.CloneAndPrepend(Substitute("Failed to upgrade legacy Impala table '$0.$1' "
- "(Kudu table name: $2), because a Kudu table with "
- "name '$0.$1' already exists'.", hms_table->dbName,
- hms_table->tableName, table_name));
- }
- }
- if (s.ok()) {
- s = hms_catalog->UpgradeLegacyImpalaTable(
- kudu_table->id(), hms_table->dbName, hms_table->tableName,
- client::SchemaFromKuduSchema(kudu_table->schema()));
- }
- }
- } else {
- // Create the table in the HMS.
- string new_table_name = Substitute("$0.$1", default_database, table_name);
- Schema schema = client::SchemaFromKuduSchema(kudu_table->schema());
- s = hms_catalog->CreateTable(kudu_table->id(), new_table_name, schema);
- while (!s.ok() && FLAGS_enable_input &&
- (MatchPattern(s.ToString(), Substitute("*$0*", kInvalidNameError)) ||
- MatchPattern(s.ToString(), Substitute("*$0*", HmsCatalog::kInvalidTableError)))) {
- new_table_name = Substitute("$0.$1", default_database,
- RenameHiveIncompatibleTable(table_name));
- s = hms_catalog->CreateTable(kudu_table->id(), new_table_name, schema);
- }
- s = s.AndThen([&] {
- return AlterKuduTableOnly(kudu_client, table_name, new_table_name);
- });
- }
-
- if (!s.ok()) {
- failures.emplace(table_name, s);
- }
- }
-
- // Returns the first failure that was seen, if any.
- if (!failures.empty()) {
- for (const auto& failure : failures) {
- LOG(WARNING) << Substitute("Failed to upgrade Kudu table $0, because: ",
- failure.first, failure.second.ToString());
- }
-
- return failures.begin()->second;
- } else {
- return Status::OK();
- }
-}
-
Status Init(const RunnerContext& context,
shared_ptr<KuduClient>* kudu_client,
unique_ptr<HmsCatalog>* hms_catalog) {
@@ -226,53 +114,7 @@ Status Init(const RunnerContext& context,
.Build(kudu_client);
}
-// Upgrade the metadata format of legacy impala managed or external tables
-// in HMS entries, as well as rename the existing tables in Kudu to adapt
-// to the new naming rules.
-//
-// Sample Legacy Hms Entries
-// Managed table
-// Table(
-// tableName=customer,
-// dbName=tpch_1000_kudu,
-// parameters={
-// kudu.master_addresses: <master-addr>:7051,
-// kudu.table_name: impala::tpch_1000_kudu.customer,
-// storage_handler: com.cloudera.kudu.hive.KuduStorageHandler,
-// },
-// tableType=MANAGED_TABLE,
-// )
-//
-// External table
-// Table(
-// tableName=metrics,
-// dbName=default,
-// parameters={
-// EXTERNAL: TRUE,
-// kudu.master_addresses: <master-addr>,
-// kudu.table_name: metrics,
-// storage_handler: com.cloudera.kudu.hive.KuduStorageHandler,
-// },
-// tableType=EXTERNAL_TABLE,
-// )
-Status HmsUpgrade(const RunnerContext& context) {
- const string& default_database = FindOrDie(context.required_args,
- kDefaultDatabaseArg);
- shared_ptr<KuduClient> kudu_client;
- unique_ptr<HmsCatalog> hms_catalog;
- Init(context, &kudu_client, &hms_catalog);
-
- // 1. Identify all Kudu tables in the HMS entries.
- vector<hive::Table> hms_tables;
- RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
-
- // 2. Rename all existing Kudu tables to have Hive-compatible table names.
- // Also, correct all out of sync metadata in HMS entries.
- return AlterLegacyKuduTables(kudu_client.get(), hms_catalog.get(),
- default_database, hms_tables);
-}
-
-// TODO: check that the HMS integration isn't enabled before running it.
+// TODO(dan): check that the HMS integration isn't enabled before running it.
Status HmsDowngrade(const RunnerContext& context) {
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
@@ -304,309 +146,523 @@ bool IsSynced(const string& master_addresses,
return s.ok() && hms_table_copy == hms_table;
}
-// Filter orphan tables from the unsynchronized tables map.
-void FilterOrphanedTables(TablesMap* tables_map) {
- for (auto it = tables_map->cbegin(); it != tables_map->cend();) {
- // If the kudu table is empty, then these table in the HMS are
- // orphan tables. Filter it as we do not care about orphan tables.
- if (it->second.first == nullptr) {
- for (const auto &table : it->second.second) {
- LOG(WARNING) << Substitute("Found orphan table $0.$1 in the Hive Metastore",
- table.dbName, table.tableName);
- }
- it = tables_map->erase(it);
- } else {
- ++it;
- }
+// Prints catalog information about Kudu tables in data table format to 'out'.
+Status PrintKuduTables(const string& master_addrs,
+ const vector<shared_ptr<KuduTable>>& kudu_tables,
+ ostream& out) {
+ DataTable table({
+ "Kudu table",
+ "Kudu table ID",
+ "Kudu master addresses",
+ });
+ for (const auto& kudu_table : kudu_tables) {
+ table.AddRow({
+ kudu_table->name(),
+ kudu_table->id(),
+ master_addrs,
+ });
}
+ return table.PrintTo(out);
}
-// Sample of unsynchronized tables:
-//
-// TableID| KuduTableName| HmsDbName| HmsTableName| KuduMasterAddresses| HmsTableMasterAddresses
-//--------+--------------+----------+-------------+--------------------+-------------------------
-// 1 | a | | | 127.0.0.1:50232 |
-// 2 | default.c | default | c | 127.0.0.1:50232 | 127.0.0.1:50232
-// 2 | | default | d | 127.0.0.1:50232 | 127.0.0.1:50232
-// 3 | default.b | | | 127.0.0.1:50232 |
-Status PrintUnsyncedTables(const string& master_addresses,
- const TablesMap& tables_map,
- ostream& out) {
- out << "Metadata of unsynchronized tables:" << endl;
- DataTable table({ "TableID", "KuduTableName", "HmsDbName", "HmsTableName",
- "KuduMasterAddresses", "HmsTableMasterAddresses"});
- for (const auto& entry : tables_map) {
- const string& table_id = entry.first;
- const KuduTable& kudu_table = *entry.second.first.get();
- const vector<hive::Table>& hms_tables = entry.second.second;
- const string& kudu_table_name = kudu_table.name();
- if (hms_tables.empty()) {
- table.AddRow({ table_id, kudu_table_name, "", "", master_addresses, "" });
+// Prints catalog information about Kudu and HMS tables in data table format to 'out'.
+Status PrintTables(const string& master_addrs,
+ vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables,
+ ostream& out) {
+ DataTable table({
+ "Kudu table",
+ "Kudu table ID",
+ "Kudu master addresses",
+ "HMS database",
+ "HMS table",
+ Substitute("HMS $0", HmsClient::kStorageHandlerKey),
+ Substitute("HMS $0", HmsClient::kLegacyKuduTableNameKey),
+ Substitute("HMS $0", HmsClient::kKuduTableIdKey),
+ Substitute("HMS $0", HmsClient::kKuduMasterAddrsKey),
+ });
+ for (auto& pair : tables) {
+ vector<string> row;
+ if (pair.first) {
+ const KuduTable& kudu_table = *pair.first;
+ row.emplace_back(kudu_table.name());
+ row.emplace_back(kudu_table.id());
+ row.emplace_back(master_addrs);
} else {
- for (const hive::Table& hms_table : hms_tables) {
- table.AddRow({
- table_id,
- kudu_table_name,
- hms_table.dbName,
- hms_table.tableName,
- master_addresses,
- FindOrDie(hms_table.parameters, HmsClient::kKuduMasterAddrsKey),
- });
- }
+ row.resize(3);
}
+ if (pair.second) {
+ hive::Table& hms_table = *pair.second;
+ row.emplace_back(hms_table.dbName);
+ row.emplace_back(hms_table.tableName);
+ row.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]);
+ row.emplace_back(hms_table.parameters[HmsClient::kLegacyKuduTableNameKey]);
+ row.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]);
+ row.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]);
+ } else {
+ row.resize(9);
+ }
+ table.AddRow(std::move(row));
}
-
return table.PrintTo(out);
}
-Status PrintLegacyTables(const vector<hive::Table>& tables, ostream& out) {
- cout << "Found legacy tables in the Hive Metastore, "
- << "use metadata upgrade tool first: 'kudu hms upgrade'."
- << endl;
- DataTable table({ "HmsDbName", "HmsTableName", "KuduTableName",
- "KuduMasterAddresses"});
- for (hive::Table t : tables) {
- const string& kudu_table_name = t.parameters[HmsClient::kLegacyKuduTableNameKey];
- const string& master_addresses = t.parameters[HmsClient::kKuduMasterAddrsKey];
- table.AddRow({ t.dbName, t.tableName, kudu_table_name, master_addresses });
+// A report of inconsistent tables in Kudu and the HMS catalogs.
+struct CatalogReport {
+ // Kudu tables in the HMS catalog which have no corresponding table in the
+ // Kudu catalog (including legacy tables).
+ vector<hive::Table> orphan_hms_tables;
+
+ // Tables in the Kudu catalog which have no corresponding table in the HMS catalog.
+ vector<shared_ptr<KuduTable>> missing_hms_tables;
+
+ // Tables in the Kudu catalog which have a Hive-incompatible name, and which
+ // are not referenced by an existing legacy Hive table (otherwise they would
+ // fall in to 'inconsistent_tables').
+ //
+ // These tables can not be automatically corrected by the fix tool.
+ vector<shared_ptr<KuduTable>> invalid_name_tables;
+
+ // Legacy Imapala/Kudu tables (storage handler is com.cloudera.kudu.hive.KuduStorageHandler).
+ vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_hms_tables;
+
+ // Kudu tables with multiple HMS table entries. The entries may or may not be legacy.
+ //
+ // These tables can not be automatically corrected by the fix tool.
+ vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_hms_tables;
+
+ // Tables whose Kudu catalog table and HMS table are inconsistent.
+ vector<pair<shared_ptr<KuduTable>, hive::Table>> inconsistent_tables;
+
+ // Returns true if the report is empty.
+ bool empty() const {
+ return orphan_hms_tables.empty()
+ && legacy_hms_tables.empty()
+ && duplicate_hms_tables.empty()
+ && inconsistent_tables.empty()
+ && missing_hms_tables.empty()
+ && invalid_name_tables.empty();
}
- return table.PrintTo(out);
-}
-
-Status RetrieveUnsyncedTables(HmsCatalog* hms_catalog,
- KuduClient* kudu_client,
- const string& master_addresses,
- TablesMap* unsynced_tables_map,
- vector<hive::Table>* legacy_tables) {
- // 1. Identify all Kudu table in the HMS entries.
- vector<hive::Table> hms_tables;
- RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
+};
- // 2. Walk through all the Kudu tables in the HMS and identify any
- // out of sync tables.
- for (auto& hms_table : hms_tables) {
- const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
- if (storage_handler == HmsClient::kKuduStorageHandler) {
- const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
- string table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
+// Retrieves the entire Kudu catalog, as well as all Kudu tables in the HMS
+// catalog, and compares them to find inconsistencies.
+//
+// Inconsistencies are bucketed into different groups, corresponding to how they
+// can be repaired.
+Status AnalyzeCatalogs(const string& master_addrs,
+ HmsCatalog* hms_catalog,
+ KuduClient* kudu_client,
+ CatalogReport* report) {
+ // Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The
+ // by-ID map will be used to match the HMS Kudu table entries. The by-name map
+ // will be used to match against legacy Impala/Kudu HMS table entries.
+ unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id;
+ unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name;
+ {
+ vector<string> kudu_table_names;
+ RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names));
+ for (const string& kudu_table_name : kudu_table_names) {
shared_ptr<KuduTable> kudu_table;
- Status s = kudu_client->OpenTable(table_name, &kudu_table);
- if (s.ok() && !IsSynced(master_addresses, *kudu_table.get(), hms_table)) {
- (*unsynced_tables_map)[kudu_table->id()].first = kudu_table;
- (*unsynced_tables_map)[hms_table_id].second.emplace_back(hms_table);
- } else if (s.IsNotFound()) {
- // We cannot determine whether this table is an orphan table in the HMS now, since
- // there may be other tables in Kudu shares the same table ID but not the same name.
- // So do it in the filtering step below.
- (*unsynced_tables_map)[hms_table_id].second.emplace_back(hms_table);
- } else {
- RETURN_NOT_OK(s);
- }
- } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
- legacy_tables->push_back(hms_table);
+ // TODO(dan): When the error is NotFound, prepend an admonishment about not
+ // running this tool when the catalog is in-flux.
+ RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table));
+ kudu_tables_by_id.emplace(kudu_table->id(), kudu_table);
+ kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table));
}
}
- // 3. If any Kudu table is not present in the HMS, consider it as an out of sync
- // table.
- vector<string> table_names;
- RETURN_NOT_OK(kudu_client->ListTables(&table_names));
- unordered_map<string, hive::Table> hms_tables_map = RetrieveTablesMap(std::move(hms_tables));
- for (const auto& table_name : table_names) {
- if (!ContainsKey(hms_tables_map, table_name)) {
- shared_ptr<KuduTable> kudu_table;
- RETURN_NOT_OK(kudu_client->OpenTable(table_name, &kudu_table));
- (*unsynced_tables_map)[kudu_table->id()].first = kudu_table;
+ // Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
+ // entries which reference non-existent Kudu tables, and group the rest by
+ // table ID.
+ vector<hive::Table> orphan_tables;
+ unordered_map<string, vector<hive::Table>> hms_tables_by_id;
+ {
+ vector<hive::Table> hms_tables;
+ RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
+ for (hive::Table& hms_table : hms_tables) {
+ const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
+ if (storage_handler == HmsClient::kKuduStorageHandler) {
+ const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
+ shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id);
+ if (kudu_table) {
+ hms_tables_by_id[(*kudu_table)->id()].emplace_back(std::move(hms_table));
+ } else {
+ orphan_tables.emplace_back(std::move(hms_table));
+ }
+ } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
+ const string& hms_table_name = hms_table.parameters[HmsClient::kLegacyKuduTableNameKey];
+ shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
+ if (kudu_table) {
+ hms_tables_by_id[(*kudu_table)->id()].emplace_back(std::move(hms_table));
+ } else {
+ orphan_tables.emplace_back(std::move(hms_table));
+ }
+ }
}
}
- // 4. Filter orphan tables.
- FilterOrphanedTables(unsynced_tables_map);
+ // Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin
+ // them appropriately.
+ vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables;
+ vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables;
+ vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables;
+ vector<shared_ptr<KuduTable>> missing_tables;
+ vector<shared_ptr<KuduTable>> invalid_name_tables;
+ for (auto& kudu_table_pair : kudu_tables_by_id) {
+ shared_ptr<KuduTable> kudu_table = kudu_table_pair.second;
+ vector<hive::Table>* hms_tables = FindOrNull(hms_tables_by_id, kudu_table_pair.first);
+
+ if (!hms_tables) {
+ const string& table_name = kudu_table->name();
+ string normalized_table_name(table_name.data(), table_name.size());
+ Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
+ if (!s.ok()) {
+ invalid_name_tables.emplace_back(std::move(kudu_table));
+ } else {
+ missing_tables.emplace_back(std::move(kudu_table));
+ }
+ } else if (hms_tables->size() == 1) {
+ hive::Table& hms_table = (*hms_tables)[0];
+ const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
+ if (storage_handler == HmsClient::kKuduStorageHandler &&
+ !IsSynced(master_addrs, *kudu_table, hms_table)) {
+ stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
+ } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
+ legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
+ }
+ } else {
+ for (hive::Table& hms_table : *hms_tables) {
+ duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table)));
+ }
+ }
+ }
+ report->orphan_hms_tables.swap(orphan_tables);
+ report->missing_hms_tables.swap(missing_tables);
+ report->invalid_name_tables.swap(invalid_name_tables);
+ report->legacy_hms_tables.swap(legacy_tables);
+ report->duplicate_hms_tables.swap(duplicate_tables);
+ report->inconsistent_tables.swap(stale_tables);
return Status::OK();
}
Status CheckHmsMetadata(const RunnerContext& context) {
- const string& master_addresses = FindOrDie(context.required_args, kMasterAddressesArg);
+ const string& master_addrs = FindOrDie(context.required_args, kMasterAddressesArg);
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
- Init(context, &kudu_client, &hms_catalog);
+ RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog));
- TablesMap unsynced_tables_map;
- std::vector<hive::Table> legacy_tables;
- RETURN_NOT_OK_PREPEND(RetrieveUnsyncedTables(hms_catalog.get(),
- kudu_client.get(),
- master_addresses,
- &unsynced_tables_map,
- &legacy_tables),
- "error fetching unsynchronized tables");
-
- // All good.
- if (unsynced_tables_map.empty() && legacy_tables.empty()) {
- cout << "OK" << endl;
+ CatalogReport report;
+ RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report));
+
+ if (report.empty()) {
return Status::OK();
}
- // Something went wrong.
- cout << "FAILED" << endl;
- if (!unsynced_tables_map.empty()) {
- RETURN_NOT_OK_PREPEND(PrintUnsyncedTables(master_addresses, unsynced_tables_map, cout),
- "error printing inconsistent data");
+ if (!report.invalid_name_tables.empty()) {
+ cout << "Found Kudu table(s) with Hive-incompatible names:" << endl;
+ RETURN_NOT_OK(PrintKuduTables(master_addrs, report.invalid_name_tables, cout));
+ cout << endl
+ << "Suggestion: rename the Kudu table(s) to be Hive-compatible, then run the fix tool:"
+ << endl;
+ for (const auto& table : report.invalid_name_tables) {
+ cout << "\t$ kudu table rename_table --alter_external_catalogs=false "
+ << master_addrs << " " << table->name() << " <database-name>.<table-name>" << endl;
+ }
cout << endl;
}
- if (!legacy_tables.empty()) {
- RETURN_NOT_OK_PREPEND(PrintLegacyTables(legacy_tables, cout),
- "error printing legacy tables");
+ if (!report.duplicate_hms_tables.empty()) {
+ vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables;
+ for (auto& table : report.duplicate_hms_tables) {
+ tables.emplace_back(table.first, &table.second);
+ }
+ cout << "Found Kudu table(s) with multiple corresponding Hive Metastore tables:" << endl;
+ RETURN_NOT_OK(PrintTables(master_addrs, std::move(tables), cout));
+ cout << endl
+ << "Suggestion: using Impala or the Hive Beeline shell, drop the duplicate Hive Metastore "
+ << endl
+ << "tables and consider recreating them as views referencing the base Kudu table."
+ << endl
+ << endl;
}
- return Status::RuntimeError("metadata check tool discovered inconsistent data");
-}
-
-Status FixUnsyncedTables(KuduClient* kudu_client,
- HmsCatalog* hms_catalog,
- const TablesMap& tables_map) {
- for (const auto& entry : tables_map) {
- const KuduTable& kudu_table = *entry.second.first;
- const vector<hive::Table>& hms_tables = entry.second.second;
-
- // 1. Create the table in the HMS if there is no corresponding table there.
- string table_id = entry.first;
- string kudu_table_name = kudu_table.name();
- Schema schema = client::SchemaFromKuduSchema(kudu_table.schema());
- cout << Substitute("Table (ID $0) is out of sync.", table_id) << endl;
- if (hms_tables.empty()) {
- RETURN_NOT_OK(hms_catalog->CreateTable(table_id, kudu_table_name, schema));
- continue;
+ if (!report.orphan_hms_tables.empty() || !report.missing_hms_tables.empty()
+ || !report.legacy_hms_tables.empty() || !report.inconsistent_tables.empty()) {
+ vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables;
+ for (const auto& kudu_table : report.missing_hms_tables) {
+ tables.emplace_back(kudu_table, nullptr);
}
-
- // 2. If more than one table shares the same table ID in the HMS, return an error
- // as it is unsafe to do an automated fix.
- //
- if (hms_tables.size() > 1) {
- auto table_to_string = [] (const hive::Table& hms_table) {
- return strings::Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
- };
- return Status::IllegalState(
- Substitute("Found more than one tables [$0] in the Hive Metastore, with the "
- "same table ID: $1", JoinMapped(hms_tables, table_to_string, ", "),
- table_id));
+ for (auto& table : report.legacy_hms_tables) {
+ tables.emplace_back(table.first, &table.second);
}
-
- // 3. If the table name in Kudu is different from the one in the HMS, correct the
- // table name in Kudu with the one in the HMS. Since we consider the HMS as the
- // source of truth for table names.
- hive::Table hms_table = hms_tables[0];
- string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
- if (kudu_table_name != hms_table_name) {
- string new_table_name;
- cout << Substitute("Renaming Kudu table $0 [id=$1] to $2 to match the Hive "
- "Metastore catalog.", kudu_table_name, table_id, hms_table_name)
- << endl;
- RETURN_NOT_OK(AlterKuduTableOnly(kudu_client, kudu_table_name, hms_table_name));
- kudu_table_name = hms_table_name;
+ for (auto& table : report.inconsistent_tables) {
+ tables.emplace_back(table.first, &table.second);
+ }
+ for (auto& hms_table : report.orphan_hms_tables) {
+ tables.emplace_back(shared_ptr<KuduTable>(), &hms_table);
+ }
+ cout << "Found table(s) with missing or inconsisentent metadata in the Kudu "
+ "catalog or Hive Metastore:" << endl;
+ RETURN_NOT_OK(PrintTables(master_addrs, std::move(tables), cout));
+
+ if (report.orphan_hms_tables.empty()) {
+ cout << endl
+ << "Suggestion: use the fix tool to repair the Kudu and Hive Metastore catalog metadata:"
+ << endl
+ << "\t$ kudu hms fix " << master_addrs << endl;
+ } else {
+ cout << endl
+ << "Suggestion: use the fix tool to repair the Kudu and Hive Metastore catalog metadata"
+ << endl
+ << "and drop Hive Metastore table(s) which reference non-existent Kudu tables:" << endl
+ << "\t$ kudu hms fix --drop_orphan_hms_tables " << master_addrs << endl;
}
-
- // 4. Correct the master addresses, and the column name and type based on the
- // information in Kudu.
- cout << Substitute("Updating metadata of table $0 [id=$1] in Hive Metastore catalog.",
- kudu_table_name, table_id) << endl;
- RETURN_NOT_OK(hms_catalog->AlterTable(table_id, hms_table_name,
- hms_table_name, schema));
}
- return Status::OK();
+ // TODO(dan): add a link to the HMS guide on kudu.apache.org to this message.
+ return Status::IllegalState("found inconsistencies in the Kudu and HMS catalogs");
}
+// Pretty-prints the table name and ID.
+string TableIdent(const KuduTable& table) {
+ return Substitute("$0 [id=$1]", table.name(), table.id());
+}
+
+// Analyzes the Kudu and HMS catalogs and attempts to fix any
+// automatically-fixable issues.
+//
+// Error handling: unexpected errors (e.g. networking errors) are fatal and
+// result in returning early. Expected application failures such as a rename
+// failing due to duplicate table being present are logged and execution
+// continues.
Status FixHmsMetadata(const RunnerContext& context) {
- const string& master_addresses = FindOrDie(context.required_args, kMasterAddressesArg);
+ const string& master_addrs = FindOrDie(context.required_args, kMasterAddressesArg);
shared_ptr<KuduClient> kudu_client;
unique_ptr<HmsCatalog> hms_catalog;
- Init(context, &kudu_client, &hms_catalog);
+ RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog));
- TablesMap unsynced_tables_map;
- std::vector<hive::Table> legacy_tables;
- RETURN_NOT_OK_PREPEND(RetrieveUnsyncedTables(hms_catalog.get(),
- kudu_client.get(),
- master_addresses,
- &unsynced_tables_map,
- &legacy_tables),
- "error fetching unsynchronized tables");
-
- if (unsynced_tables_map.empty() && legacy_tables.empty()) {
- cout << "Metadata between Kudu and Hive Metastore is in sync." << endl;
- return Status::OK();
+ CatalogReport report;
+ RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report));
+
+ bool success = true;
+
+ if (FLAGS_drop_orphan_hms_tables) {
+ for (hive::Table& hms_table : report.orphan_hms_tables) {
+ string table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
+ const string& master_addrs_param = hms_table.parameters[HmsClient::kKuduMasterAddrsKey];
+ if (master_addrs_param != master_addrs && !FLAGS_force) {
+ LOG(INFO) << "Skipping drop of orphan HMS table " << table_name
+ << " with master addresses parameter " << master_addrs_param
+ << " because it does not match the --" << kMasterAddressesArg << " argument"
+ << " (use --force to skip this check)";
+ continue;
+ }
+
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Dropping orphan HMS table " << table_name;
+ } else {
+ const string& table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
+ const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
+ // All errors are fatal here, since we've already checked that the table exists in the HMS.
+ if (storage_handler == HmsClient::kKuduStorageHandler) {
+ RETURN_NOT_OK_PREPEND(hms_catalog->DropTable(table_id, table_name),
+ Substitute("failed to drop orphan HMS table $0", table_name));
+ } else {
+ RETURN_NOT_OK_PREPEND(hms_catalog->DropLegacyTable(table_name),
+ Substitute("failed to drop legacy orphan HMS table $0", table_name));
+ }
+ }
+ }
}
- // print the legacy tables if any, and returns a runtime error.
- if (!legacy_tables.empty()) {
- RETURN_NOT_OK_PREPEND(PrintLegacyTables(legacy_tables, cout),
- "error printing legacy tables");
- return Status::RuntimeError("metadata fix tool encountered fatal errors");
+ if (FLAGS_create_missing_hms_tables) {
+ for (const auto& kudu_table : report.missing_hms_tables) {
+ const string& table_id = kudu_table->id();
+ const string& table_name = kudu_table->name();
+ Schema schema = client::SchemaFromKuduSchema(kudu_table->schema());
+ string normalized_table_name(table_name.data(), table_name.size());
+ CHECK_OK(hms::HmsCatalog::NormalizeTableName(&normalized_table_name));
+
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << TableIdent(*kudu_table);
+ } else {
+ Status s = hms_catalog->CreateTable(table_id, table_name, schema);
+ if (s.IsAlreadyPresent()) {
+ LOG(ERROR) << "Failed to create HMS table for Kudu table "
+ << TableIdent(*kudu_table)
+ << " because another table already exists in the HMS with that name";
+ success = false;
+ continue;
+ }
+ if (s.IsInvalidArgument()) {
+ // This most likely means the database doesn't exist, but it is ambiguous.
+ LOG(ERROR) << "Failed to create HMS table for Kudu table "
+ << TableIdent(*kudu_table)
+ << " (database does not exist?): " << s.message().ToString();
+ success = false;
+ continue;
+ }
+ // All other errors are unexpected.
+ RETURN_NOT_OK_PREPEND(s,
+ Substitute("failed to create HMS table for Kudu table $0", TableIdent(*kudu_table)));
+ }
+
+ if (normalized_table_name != table_name) {
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(*kudu_table)
+ << " to lowercased Hive-compatible name: " << normalized_table_name;
+ } else {
+ // All errors are fatal. We never expect to get an 'AlreadyPresent'
+ // error, since the catalog manager validates that no two
+ // Hive-compatible table names differ only by case.
+ //
+ // Note that if an error occurs we do not roll-back the HMS table
+ // creation step, since a subsequent run of the tool will recognize
+ // the table as an inconsistent table (Kudu and HMS table names do not
+ // match), and automatically fix it.
+ RETURN_NOT_OK_PREPEND(
+ RenameTableInKuduCatalog(kudu_client.get(), table_name, normalized_table_name),
+ Substitute("failed to rename Kudu table $0 to lowercased Hive compatible name $1",
+ TableIdent(*kudu_table), normalized_table_name));
+ }
+ }
+ }
}
- // Fix inconsistent metadata.
- RETURN_NOT_OK_PREPEND(FixUnsyncedTables(kudu_client.get(), hms_catalog.get(),
- unsynced_tables_map),
- "error fixing inconsistent metadata");
- cout << "DONE" << endl;
- return Status::OK();
+ if (FLAGS_upgrade_hms_tables) {
+ for (const auto& table_pair : report.legacy_hms_tables) {
+ const KuduTable& kudu_table = *table_pair.first;
+ const hive::Table& hms_table = table_pair.second;
+ string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
+
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Upgrading legacy Impala HMS metadata for table "
+ << hms_table_name;
+ } else {
+ RETURN_NOT_OK_PREPEND(hms_catalog->UpgradeLegacyImpalaTable(
+ kudu_table.id(), hms_table.dbName, hms_table.tableName,
+ client::SchemaFromKuduSchema(kudu_table.schema())),
+ Substitute("failed to upgrade legacy Impala HMS metadata for table $0",
+ hms_table_name));
+ }
+
+ if (kudu_table.name() != hms_table_name) {
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
+ << " to " << hms_table_name;
+ } else {
+ Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
+ if (s.IsAlreadyPresent()) {
+ LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
+ << " to match the Hive Metastore name " << hms_table_name
+ << ", because a Kudu table with name" << hms_table_name
+ << " already exists";
+ LOG(INFO) << "Suggestion: rename the conflicting table name manually:\n"
+ << "\t$ kudu table rename_table --alter_external_catalogs=false "
+ << master_addrs << " " << hms_table_name << " <database-name>.<table-name>'";
+ success = false;
+ continue;
+ }
+
+ // All other errors are fatal. Note that if an error occurs we do not
+ // roll-back the HMS legacy upgrade step, since a subsequent run of
+ // the tool will recognize the table as an inconsistent table (Kudu
+ // and HMS table names do not match), and automatically fix it.
+ RETURN_NOT_OK_PREPEND(s,
+ Substitute("failed to rename Kudu table $0 to $1",
+ TableIdent(kudu_table), hms_table_name));
+ }
+ }
+ }
+ }
+
+ if (FLAGS_fix_inconsistent_tables) {
+ for (const auto& table_pair : report.inconsistent_tables) {
+ const KuduTable& kudu_table = *table_pair.first;
+ const hive::Table& hms_table = table_pair.second;
+ string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
+
+ if (hms_table_name != kudu_table.name()) {
+ // Update the Kudu table name to match the HMS table name.
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
+ << " to " << hms_table_name;
+ } else {
+ Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
+ if (s.IsAlreadyPresent()) {
+ LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
+ << " to match HMS table " << hms_table_name
+ << ", because a Kudu table with name " << hms_table_name
+ << " already exists";
+ success = false;
+ continue;
+ }
+ RETURN_NOT_OK_PREPEND(s,
+ Substitute("failed to rename Kudu table $0 to $1",
+ TableIdent(kudu_table), hms_table_name));
+ }
+ }
+
+ // Update the HMS table metadata to match Kudu.
+ if (FLAGS_dryrun) {
+ LOG(INFO) << "[dryrun] Refreshing HMS table metadata for Kudu table "
+ << TableIdent(kudu_table);
+ } else {
+ Schema schema(client::SchemaFromKuduSchema(kudu_table.schema()));
+ RETURN_NOT_OK_PREPEND(
+ hms_catalog->AlterTable(kudu_table.id(), hms_table_name, hms_table_name, schema),
+ Substitute("failed to refresh HMS table metadata for Kudu table $0",
+ TableIdent(kudu_table)));
+ }
+ }
+ }
+
+ if (FLAGS_dryrun || success) {
+ return Status::OK();
+ }
+ return Status::RuntimeError("Failed to fix some catalog metadata inconsistencies");
}
unique_ptr<Mode> BuildHmsMode() {
- unique_ptr<Action> hms_upgrade =
- ActionBuilder("upgrade", &HmsUpgrade)
- .Description("Upgrade the legacy metadata for Kudu and the Hive Metastores")
- .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
- .AddRequiredParameter({ kDefaultDatabaseArg, kDefaultDatabaseArgDesc })
- .AddOptionalParameter("hive_metastore_uris")
- .AddOptionalParameter("hive_metastore_sasl_enabled")
- .AddOptionalParameter("hive_metastore_retry_count")
- .AddOptionalParameter("hive_metastore_send_timeout")
- .AddOptionalParameter("hive_metastore_recv_timeout")
- .AddOptionalParameter("hive_metastore_conn_timeout")
- .AddOptionalParameter("enable_input")
- .Build();
- unique_ptr<Action> hms_downgrade =
- ActionBuilder("downgrade", &HmsDowngrade)
- .Description("Downgrade the metadata to legacy format for "
- "Kudu and the Hive Metastores")
- .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
- .AddOptionalParameter("hive_metastore_uris")
- .AddOptionalParameter("hive_metastore_sasl_enabled")
- .AddOptionalParameter("hive_metastore_retry_count")
- .AddOptionalParameter("hive_metastore_send_timeout")
- .AddOptionalParameter("hive_metastore_recv_timeout")
- .AddOptionalParameter("hive_metastore_conn_timeout")
- .Build();
+ // TODO(dan): automatically retrieve the HMS URIs and SASL config from the
+ // Kudu master instead of requiring them as an additional flag.
unique_ptr<Action> hms_check =
ActionBuilder("check", &CheckHmsMetadata)
- .Description("Check the metadata consistency between Kudu and the Hive Metastores")
+ .Description("Check metadata consistency between Kudu and the Hive Metastore catalogs")
.AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
.AddOptionalParameter("hive_metastore_uris")
.AddOptionalParameter("hive_metastore_sasl_enabled")
- .AddOptionalParameter("hive_metastore_retry_count")
- .AddOptionalParameter("hive_metastore_send_timeout")
- .AddOptionalParameter("hive_metastore_recv_timeout")
- .AddOptionalParameter("hive_metastore_conn_timeout")
.Build();
unique_ptr<Action> hms_fix =
ActionBuilder("fix", &FixHmsMetadata)
- .Description("Fix the metadata inconsistency between Kudu and the Hive Metastores")
+ .Description("Fix automatically-repairable metadata inconsistencies in the "
+ "Kudu and Hive Metastore catalogs")
.AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
.AddOptionalParameter("hive_metastore_uris")
.AddOptionalParameter("hive_metastore_sasl_enabled")
- .AddOptionalParameter("hive_metastore_retry_count")
- .AddOptionalParameter("hive_metastore_send_timeout")
- .AddOptionalParameter("hive_metastore_recv_timeout")
- .AddOptionalParameter("hive_metastore_conn_timeout")
+ .AddOptionalParameter("dryrun")
+ .AddOptionalParameter("drop_orphan_hms_tables")
+ .AddOptionalParameter("create_missing_hms_tables")
+ .AddOptionalParameter("fix_inconsistent_tables")
+ .AddOptionalParameter("upgrade_hms_tables")
.Build();
+ // TODO(dan): add 'hms precheck' tool to check for overlapping normalized table names.
+
+ unique_ptr<Action> hms_downgrade =
+ ActionBuilder("downgrade", &HmsDowngrade)
+ .Description("Downgrade the metadata to legacy format for "
+ "Kudu and the Hive Metastores")
+ .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+ .AddOptionalParameter("hive_metastore_uris")
+ .AddOptionalParameter("hive_metastore_sasl_enabled")
+ .Build();
+
return ModeBuilder("hms").Description("Operate on remote Hive Metastores")
- .AddAction(std::move(hms_upgrade))
.AddAction(std::move(hms_downgrade))
.AddAction(std::move(hms_check))
.AddAction(std::move(hms_fix))