You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/12 15:23:11 UTC

[kudu] 02/02: KUDU-2848: fetch HMS DB UUID in the background

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9e3a9fb1fc0a9a85bb1c948edb8a6b37f98ba2d0
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jun 12 00:03:18 2019 -0700

    KUDU-2848: fetch HMS DB UUID in the background
    
    To avoid having a hard startup dependency from Kudu to HMS, let's thunk the
    work of fetching the DB UUID to a helper thread. It's a simple approach, but
    admittedly I haven't given much thought to whether it preserves reasonable
    semantics for clients.
    
    The new test isn't useful because our HMS test artifact doesn't support DB
    UUIDs. I'll make sure this gets proper testing after merging though.
    
    Change-Id: I5d190d317b3ee3e69493638e866949cef8fb5d46
    Reviewed-on: http://gerrit.cloudera.org:8080/13594
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 src/kudu/hms/hms_catalog-test.cc | 24 ++++++++++++++++++++
 src/kudu/hms/hms_catalog.cc      | 49 +++++++++++++++++++++++++++-------------
 src/kudu/hms/hms_catalog.h       | 28 ++++++++++++++++++++++-
 3 files changed, 84 insertions(+), 17 deletions(-)

diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
index a8cc0a9..cd32622 100644
--- a/src/kudu/hms/hms_catalog-test.cc
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -453,6 +453,7 @@ TEST_F(HmsCatalogTest, TestReconnect) {
 }
 
 TEST_F(HmsCatalogTest, TestMetastoreUuid) {
+  // Test that if the HMS has a DB UUID, we can retrieve it.
   string uuid;
   Status s = hms_catalog_->GetUuid(&uuid);
   if (s.ok()) {
@@ -460,6 +461,29 @@ TEST_F(HmsCatalogTest, TestMetastoreUuid) {
   } else {
     ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
   }
+
+  // After stopping the HMS:
+  // 1. We should still be able to initialize the catalog.
+  // 2. Attempts to fetch the DB UUID will fail.
+  ASSERT_OK(hms_->Stop());
+  hms_catalog_.reset(new HmsCatalog(kMasterAddrs));
+  ASSERT_OK(hms_catalog_->Start());
+  ASSERT_TRUE(hms_catalog_->GetUuid(nullptr).IsNotSupported());
+
+  // But if we start the HMS back up, we should be able to eventually get the
+  // DB UUID, though we need to account for the possibility that this HMS may
+  // not support it.
+  ASSERT_OK(hms_->Start());
+  string uuid2;
+  ASSERT_EVENTUALLY([&] {
+      Status s2 = hms_catalog_->GetUuid(&uuid2);
+      if (s.ok()) {
+        ASSERT_OK(s2);
+      } else {
+        ASSERT_TRUE(s2.IsNotSupported());
+      }
+    });
+  ASSERT_EQ(uuid, uuid2);
 }
 
 } // namespace hms
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 0fb46a7..686bcdd 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -20,6 +20,7 @@
 #include <iostream>
 #include <iterator>
 #include <map>
+#include <mutex>
 #include <string>
 #include <type_traits>
 #include <utility>
@@ -45,8 +46,8 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/thread.h"
 
-using boost::none;
 using boost::optional;
 using std::move;
 using std::string;
@@ -103,7 +104,8 @@ namespace kudu {
 namespace hms {
 
 HmsCatalog::HmsCatalog(string master_addresses)
-    : master_addresses_(std::move(master_addresses)) {
+    : master_addresses_(std::move(master_addresses)),
+      running_(1) {
 }
 
 HmsCatalog::~HmsCatalog() {
@@ -111,6 +113,7 @@ HmsCatalog::~HmsCatalog() {
 }
 
 Status HmsCatalog::Start(HmsClientVerifyKuduSyncConfig verify_service_config) {
+  running_.Reset(1);
   vector<HostPort> addresses;
   RETURN_NOT_OK(ParseUris(FLAGS_hive_metastore_uris, &addresses));
 
@@ -126,25 +129,18 @@ Status HmsCatalog::Start(HmsClientVerifyKuduSyncConfig verify_service_config) {
 
   RETURN_NOT_OK(ha_client_.Start(std::move(addresses), std::move(options)));
 
-  // Fetch the UUID of the HMS DB, if available.
-  string uuid;
-  Status uuid_status = ha_client_.Execute([&] (HmsClient* client) {
-    return client->GetUuid(&uuid);
-  });
-  if (uuid_status.ok()) {
-    VLOG(1) << "Connected to HMS with uuid " << uuid;
-    uuid_ = std::move(uuid);
-  } else if (uuid_status.IsNotSupported()) {
-    VLOG(1) << "Unable to fetch UUID for HMS: " << uuid_status.ToString();
-  } else {
-    // If we couldn't connect to the HMS at all, fail.
-    return uuid_status;
-  }
+  RETURN_NOT_OK(Thread::Create("hms_catalog", "fetch_uuid",
+                               &HmsCatalog::LoopInitializeUuid, this,
+                               &uuid_initializing_thread_));
   return Status::OK();
 }
 
 void HmsCatalog::Stop() {
+  running_.CountDown();
   ha_client_.Stop();
+  if (uuid_initializing_thread_) {
+    uuid_initializing_thread_->Join();
+  }
 }
 
 Status HmsCatalog::CreateTable(const string& id,
@@ -305,6 +301,7 @@ Status HmsCatalog::GetNotificationEvents(int64_t last_event_id, int max_events,
 }
 
 Status HmsCatalog::GetUuid(string* uuid) {
+  std::lock_guard<simple_spinlock> l(uuid_lock_);
   if (!uuid_) {
     return Status::NotSupported("No HMS UUID available");
   }
@@ -451,6 +448,26 @@ bool HmsCatalog::ValidateUris(const char* flag_name, const string& metastore_uri
   return s.ok();
 }
 
+void HmsCatalog::LoopInitializeUuid() {
+  do {
+    // Fetch the UUID of the HMS DB, if available.
+    string uuid;
+    Status s = ha_client_.Execute([&] (HmsClient* client) {
+        return client->GetUuid(&uuid);
+      });
+    if (s.ok()) {
+      VLOG(1) << "Connected to HMS with uuid " << uuid;
+      std::lock_guard<simple_spinlock> l(uuid_lock_);
+      uuid_ = std::move(uuid);
+    } else if (s.IsNotSupported()) {
+      VLOG(1) << "Unable to fetch UUID for HMS: " << s.ToString();
+      return;
+    }
+    // If we couldn't connect to the HMS at all, sleep and try again.
+    VLOG(1) << "Couldn't connect to HMS: " << s.ToString();
+  } while (!running_.WaitFor(MonoDelta::FromSeconds(1)));
+}
+
 bool HmsCatalog::IsEnabled() {
   return !FLAGS_hive_metastore_uris.empty();
 }
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index 981da96..95c44c1 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -25,15 +25,19 @@
 #include <gtest/gtest_prod.h>
 
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
 #include "kudu/thrift/client.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
 class HostPort;
 class Schema;
+class Thread;
 
 namespace hms {
 
@@ -137,7 +141,9 @@ class HmsCatalog {
   // HMS itself changes hostnames, etc.
   //
   // NOTE: this is not implemented in Hive 2.x but may be backported into some
-  // vendor releases (eg CDH6). This function may return Status::NotSupported().
+  // vendor releases (eg CDH6). This function may return Status::NotSupported()
+  // in such cases, or if the HmsCatalog has been unable to connect to the HMS
+  // (i.e. the HMS has been down for the entirety of the HmsCatalog's lifespan).
   Status GetUuid(std::string* uuid) WARN_UNUSED_RESULT;
 
   // Validates the hive_metastore_uris gflag.
@@ -186,6 +192,14 @@ class HmsCatalog {
   Status DropTable(const std::string& name,
                    const hive::EnvironmentContext& env_ctx) WARN_UNUSED_RESULT;
 
+  // Repeatedly tries to fetch the HMS' DB UUID, sleeping in between attempts.
+  //
+  // The loop terminates when one of the following conditions are met:
+  // 1. The UUID is fetched successfully.
+  // 2. running_ counts down to 0.
+  // 3. The HMS responds with NotSupported.
+  void LoopInitializeUuid();
+
   // Parses a Hive Metastore URI string into a sequence of HostPorts.
   static Status ParseUris(const std::string& metastore_uris, std::vector<HostPort>* hostports);
 
@@ -195,8 +209,20 @@ class HmsCatalog {
   // Kudu master addresses.
   const std::string master_addresses_;
 
+  // HmsClient instance used for remote HMS requests.
   thrift::HaClient<hms::HmsClient> ha_client_;
 
+  // Background thread that tries to initialize 'uuid_'.
+  scoped_refptr<Thread> uuid_initializing_thread_;
+
+  // Set to 0 when the HmsCatalog instance has been stopped.
+  CountDownLatch running_;
+
+  // Protects 'uuid_'.
+  simple_spinlock uuid_lock_;
+
+  // The UUID of the remote HMS instance. Will be boost::none if the HMS is
+  // down, or if the HMS does not support UUIDs.
   boost::optional<std::string> uuid_;
 };