You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2020/03/27 12:27:13 UTC

[impala] 01/02: IMPALA-9549: Handle catalogd startup delays when using local catalog

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1d63348b933b266f63d76b06eecbdf636cb45770
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Tue Mar 24 15:15:51 2020 -0700

    IMPALA-9549: Handle catalogd startup delays when using local catalog
    
    Impalads should be tolerant of delays in catalogd startup.
    Currently, when running with the local catalog
    (use_local_catalog=true), impalad startup can fail when catalogd
    startup is delayed. What happens is that ImpalaServer's constructor
    calls ImpalaServer::UpdateCatalogMetrics(), which maintains two
    metrics counting the number of tables and databases. This is before
    the code in ImpalaServer::Start() that waits for the catalogd to
    start (added by IMPALA-4704), so there is no guarantee that catalogd
    is running. The UpdateCatalogMetrics() call ends up calling getDbs()
    in the frontend catalog. LocalCatalog::getDbs() tries to load the
    databases (and thus contact catalogd), and this call will fail if
    catalogd is not running. This fails startup.
    
    use_local_catalog=false is immune to this only because it does not
    contact catalogd in Catalog::getDbs().
    
    This moves the UpdateCatalogMetrics() call from the ImpalaServer
    constructor to ImpalaServer::Start() after the impalad has already
    waited for the catalogd to start up. It also limits the call to
    run only in coordinators.
    
    Prior to this change, when using local catalog, the executors would
    have catalog.num-databases and catalog.num-tables set to the right
    values at startup. These values would not be kept up to date.
    With this change, the executors do not have these values set.
    
    Without local catalog, both before and after this change, executors
    do not have accurate counts for catalog.num-databases or
    catalog.num-tables.
    
    Testing:
     - Added a test to custom_cluster.test_catalog_wait to delay catalogd
       start up by 60 seconds and verify that the impalads successfully
       start up. This test fails prior to this change.
     - Hand tested to verify that the metrics that are maintained by
       UpdateCatalogMetrics() are not meaningfully changed for coordinators
       and that executors do not have metrics set.
    
    Change-Id: I1b5a94c59faaaa25927a169dcb58f310ce6b1044
    Reviewed-on: http://gerrit.cloudera.org:8080/15561
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc          |  9 +++++
 be/src/common/global-flags.cc             |  3 ++
 be/src/service/impala-server.cc           |  7 ++--
 tests/custom_cluster/test_catalog_wait.py | 56 +++++++++++++++++++++++++++++--
 4 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index ad140b8..bea5cc3 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -79,6 +79,10 @@ DECLARE_int32(state_store_port);
 DECLARE_string(hostname);
 DECLARE_bool(compact_catalog_topic);
 
+#ifndef NDEBUG
+DECLARE_int32(stress_catalog_startup_delay_ms);
+#endif
+
 string CatalogServer::IMPALA_CATALOG_TOPIC = "catalog-update";
 
 const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES =
@@ -252,6 +256,11 @@ Status CatalogServer::Start() {
 
   // This will trigger a full Catalog metadata load.
   catalog_.reset(new Catalog());
+#ifndef NDEBUG
+  if (FLAGS_stress_catalog_startup_delay_ms > 0) {
+    SleepForMs(FLAGS_stress_catalog_startup_delay_ms);
+  }
+#endif
   RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-update-gathering-thread",
       &CatalogServer::GatherCatalogUpdatesThread, this,
       &catalog_update_gathering_thread_));
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 429c87b..d894574 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -161,6 +161,9 @@ DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option th
 DEFINE_int32(stress_catalog_init_delay_ms, 0, "A stress option that injects extra delay"
     " in milliseconds when initializing an impalad's local catalog replica. Delay <= 0"
     " inject no delay.");
+DEFINE_int32(stress_catalog_startup_delay_ms, 0, "A stress option that injects extra "
+    "delay in milliseconds during the startup of catalogd. The delay is before the "
+    "catalogd opens ports or accepts connections. Delay <= 0 injects no delay.");
 DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra delay"
     " in milliseconds when the I/O manager is reading from disk.");
 #endif
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index fb931b7..6db86d8 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -449,8 +449,6 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
         filter_prefix, catalog_cb));
   }
 
-  ABORT_IF_ERROR(UpdateCatalogMetrics());
-
   // Initialise the cancellation thread pool with 5 (by default) threads. The max queue
   // size is deliberately set so high that it should never fill; if it does the
   // cancellations will get ignored and retried on the next statestore heartbeat.
@@ -2342,7 +2340,10 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
   // then wait for the initial catalog update.
   RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
 
-  if (FLAGS_is_coordinator) exec_env_->frontend()->WaitForCatalog();
+  if (FLAGS_is_coordinator) {
+    exec_env_->frontend()->WaitForCatalog();
+    ABORT_IF_ERROR(UpdateCatalogMetrics());
+  }
 
   SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
   if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) {
diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py
index b60f4e2..34b366a 100644
--- a/tests/custom_cluster/test_catalog_wait.py
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -41,9 +41,11 @@ class TestCatalogWait(CustomClusterTestSuite):
       assert 'Could not connect to' in str(e.value)
 
   @pytest.mark.execute_serially
-  def test_delayed_catalog(self):
+  def test_delayed_impalad_catalog(self):
     """ Tests client interactions with the cluster when one of the daemons,
-        impalad[2], is delayed in initializing its local catalog replica."""
+        impalad[2], is delayed in initializing its local catalog replica.
+        This delay is simulated on the impalad side, and the catalogd starts
+        up normally."""
 
     # On startup, expect only two executors to be registered.
     self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
@@ -80,3 +82,53 @@ class TestCatalogWait(CustomClusterTestSuite):
     self.cluster.impalads[0].service.wait_for_metric_value('impala-server.num-fragments', 3);
     self.cluster.impalads[1].service.wait_for_metric_value('impala-server.num-fragments', 3);
     self.cluster.impalads[2].service.wait_for_metric_value('impala-server.num-fragments', 0);
+
+
+@SkipIfBuildType.not_dev_build
+class TestCatalogStartupDelay(CustomClusterTestSuite):
+  """This test injects a real delay in catalogd startup. The impalads are expected to be
+     able to tolerate this delay, either because they wait (as coordinators do) or
+     because they don't need anything from the catalogd. This is done for a few
+     different cluster setups (different metadata, exclusive coordinators). This
+     is not testing anything beyond successful startup."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('Catalog startup delay tests only run in exhaustive')
+    super(TestCatalogStartupDelay, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--stress_catalog_startup_delay_ms=60000")
+  def test_default_metadata_settings(self):
+    """This variant tests the default metadata settings."""
+    # The actual test here is successful startup, and we assume nothing about the
+    # functionality of the impalads before the catalogd finishes starting up.
+    self.execute_query("select count(*) from functional.alltypes")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--stress_catalog_startup_delay_ms=60000 --catalog_topic_mode=minimal")
+  def test_local_catalog(self):
+    """This variant tests with the local catalog."""
+    # The actual test here is successful startup, and we assume nothing about the
+    # functionality of the impalads before the catalogd finishes starting up.
+    self.execute_query("select count(*) from functional.alltypes")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    num_exclusive_coordinators=1,
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--stress_catalog_startup_delay_ms=60000 --catalog_topic_mode=minimal")
+  def test_local_catalog_excl_coord(self):
+    """This variant tests with the local catalog and an exclusive coordinator. The
+       purpose is to verify that executors do not break."""
+    # The actual test here is successful startup, and we assume nothing about the
+    # functionality of the impalads before the catalogd finishes starting up.
+    self.execute_query("select count(*) from functional.alltypes")