You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/19 15:16:29 UTC

[impala] 02/02: IMPALA-8401: SIGRTMIN initiates the graceful shutdown process

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1a800457a13c3407a33e796cef0c328b2d90c52e
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Mon Apr 8 16:59:40 2019 -0700

    IMPALA-8401: SIGRTMIN initiates the graceful shutdown process
    
    This patch enables a user that has access to the impalad process,
    to initiate the graceful shutdown process with a deadline of one year
    by sending SIGRTMIN signal to it.
    Sample usage: "kill -SIGRTMIN <IMPALAD_PID>"
    
    Testing:
    Added relevant e2e tests.
    Tested on CentOS 6, CentOS 7, Ubuntu 16.04, Ubuntu 18.04 and SLES 12
    
    Change-Id: I521ffd7526ac9a8a5c4996994eb68d6a855aef86
    Reviewed-on: http://gerrit.cloudera.org:8080/12973
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/init.cc                         | 56 +++++++++++++++++++++++
 be/src/common/init.h                          | 10 +++++
 be/src/service/impala-server.cc               |  2 +-
 be/src/service/impalad-main.cc                |  2 +-
 tests/custom_cluster/test_restart_services.py | 65 +++++++++++++++++++++++----
 5 files changed, 124 insertions(+), 11 deletions(-)

diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 2b8dbd4..feccbe0 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -38,6 +38,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
+#include "service/impala-server.h"
 #include "util/cgroup-util.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -112,6 +113,10 @@ static unique_ptr<impala::Thread> log_maintenance_thread;
 // 2) Frees excess memory that TCMalloc has left in its pageheap.
 static unique_ptr<impala::Thread> memory_maintenance_thread;
 
+// Shutdown signal handler thread that calls sigwait() on IMPALA_SHUTDOWN_SIGNAL and
+// initiates a graceful shutdown with a virtually unlimited deadline (one year).
+static unique_ptr<impala::Thread> shutdown_signal_handler_thread;
+
 // A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps
 // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual
 // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged.
@@ -177,6 +182,31 @@ extern "C" { void __gcov_flush(); }
   }
 }
 
+[[noreturn]] static void ImpalaShutdownSignalHandler() {
+  sigset_t signals;
+  CHECK_EQ(0, sigemptyset(&signals));
+  CHECK_EQ(0, sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL));
+  DCHECK(ExecEnv::GetInstance() != nullptr);
+  DCHECK(ExecEnv::GetInstance()->impala_server() != nullptr);
+  ImpalaServer* impala_server = ExecEnv::GetInstance()->impala_server();
+  while (true) {
+    int signal;
+    int err = sigwait(&signals, &signal);
+    CHECK(err == 0) << "sigwait(): " << GetStrErrMsg(err) << ": " << err;
+    CHECK_EQ(IMPALA_SHUTDOWN_SIGNAL, signal);
+    ShutdownStatusPB shutdown_status;
+    const int ONE_YEAR_IN_SECONDS = 365 * 24 * 60 * 60;
+    Status status = impala_server->StartShutdown(ONE_YEAR_IN_SECONDS, &shutdown_status);
+    if (!status.ok()) {
+      LOG(ERROR) << "Shutdown signal received but unable to initiate shutdown. Status: "
+                 << status.GetDetail();
+      continue;
+    }
+    LOG(INFO) << "Shutdown signal received. Current Shutdown Status: "
+              << ImpalaServer::ShutdownStatusToString(shutdown_status);
+  }
+}
+
 static void PauseMonitorLoop() {
   if (FLAGS_pause_monitor_warn_threshold_ms <= 0) return;
   int64_t time_before_sleep = MonotonicMillis();
@@ -207,9 +237,30 @@ static void PauseMonitorLoop() {
   _exit(0);
 }
 
+// Helper method that checks the return value of a syscall passed through
+// 'syscall_ret_val'. If it indicates an error, it writes an error message to stderr along
+// with the error string fetched via errno and calls exit().
+void AbortIfError(const int syscall_ret_val, const string& msg) {
+  if (syscall_ret_val == 0) return;
+  cerr << Substitute("$0 Error: $1", msg, GetStrErrMsg());
+  exit(1);
+}
+
+// Blocks the IMPALA_SHUTDOWN_SIGNAL signal. Should be called by the process before
+// spawning any other threads to make sure it gets blocked in all threads and will only be
+// caught by the thread waiting on it.
+void BlockImpalaShutdownSignal() {
+  const string error_msg = "Failed to block IMPALA_SHUTDOWN_SIGNAL for all threads.";
+  sigset_t signals;
+  AbortIfError(sigemptyset(&signals), error_msg);
+  AbortIfError(sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL), error_msg);
+  AbortIfError(pthread_sigmask(SIG_BLOCK, &signals, nullptr), error_msg);
+}
+
 void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
     TestInfo::Mode test_mode) {
   srand(time(NULL));
+  BlockImpalaShutdownSignal();
 
   CpuInfo::Init();
   DiskInfo::Init();
@@ -351,6 +402,11 @@ Status impala::StartMemoryMaintenanceThread() {
       &MemoryMaintenanceThread, &memory_maintenance_thread);
 }
 
+Status impala::StartImpalaShutdownSignalHandlerThread() {
+  return Thread::Create("common", "shutdown-signal-handler", &ImpalaShutdownSignalHandler,
+      &shutdown_signal_handler_thread);
+}
+
 #if defined(ADDRESS_SANITIZER)
 // Default ASAN_OPTIONS. Override by setting environment variable $ASAN_OPTIONS.
 extern "C" const char *__asan_default_options() {
diff --git a/be/src/common/init.h b/be/src/common/init.h
index fe8a063..ff0194e 100644
--- a/be/src/common/init.h
+++ b/be/src/common/init.h
@@ -21,6 +21,10 @@
 #include "util/test-info.h"
 #include "common/status.h"
 
+// Using the first real-time signal available to initiate graceful shutdown.
+// See "Real-time signals" section under signal(7)'s man page for more info.
+#define IMPALA_SHUTDOWN_SIGNAL SIGRTMIN
+
 namespace impala {
 
 /// Initialises logging, flags, and, if init_jvm is true, an embedded JVM.
@@ -35,6 +39,12 @@ void InitCommonRuntime(int argc, char** argv, bool init_jvm,
 /// RegisterMemoryMetrics(). This thread is needed for daemons to free memory and
 /// refresh metrics but is not needed for standalone tests.
 Status StartMemoryMaintenanceThread() WARN_UNUSED_RESULT;
+
+/// Starts Impala shutdown signal handler thread. This thread is responsible for
+/// synchronously handling the IMPALA_SHUTDOWN_SIGNAL signal and initiating graceful
+/// shutdown when it is received. Must be called only after IMPALA_SHUTDOWN_SIGNAL is
+/// blocked on all threads in the process and impala server has started.
+Status StartImpalaShutdownSignalHandlerThread() WARN_UNUSED_RESULT;
 }
 
 #endif
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 5434282..df09d21 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2456,7 +2456,7 @@ ShutdownStatusPB ImpalaServer::GetShutdownStatus() const {
 }
 
 string ImpalaServer::ShutdownStatusToString(const ShutdownStatusPB& shutdown_status) {
-  return Substitute("startup grace period left: $0, deadline left: $1, "
+  return Substitute("shutdown grace period left: $0, deadline left: $1, "
                     "queries registered on coordinator: $2, queries executing: $3, "
                     "fragment instances: $4",
       PrettyPrinter::Print(shutdown_status.grace_remaining_ms(), TUnit::TIME_MS),
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 2b7cc95..7650126 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -92,7 +92,7 @@ int ImpaladMain(int argc, char** argv) {
     ShutdownLogging();
     exit(1);
   }
-
+  ABORT_IF_ERROR(StartImpalaShutdownSignalHandlerThread());
   impala_server->Join();
 
   return 0;
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index e1e19ba..ca6ffaf 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -19,6 +19,7 @@ import logging
 import pytest
 import psutil
 import re
+import signal
 import socket
 import time
 
@@ -93,15 +94,16 @@ def parse_shutdown_result(result):
   deadline left, queries registered, queries executing)."""
   assert len(result.data) == 1
   summary = result.data[0]
-  match = re.match(r'startup grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), ' +
-      r'queries registered on coordinator: ([0-9]*), queries executing: ([0-9]*), ' +
-      r'fragment instances: [0-9]*', summary)
+  match = re.match(r'shutdown grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), '
+                   r'queries registered on coordinator: ([0-9]*), queries executing: '
+                   r'([0-9]*), fragment instances: [0-9]*', summary)
   assert match is not None, summary
   return match.groups()
 
 
-class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
+class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
   IDLE_SHUTDOWN_GRACE_PERIOD_S = 1
+  IMPALA_SHUTDOWN_SIGNAL = signal.SIGRTMIN
 
   @classmethod
   def get_workload(cls):
@@ -113,8 +115,8 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
           --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S,
             hostname=socket.gethostname()))
   def test_shutdown_idle(self):
-    """Test that idle impalads shut down in a timely manner after the startup grace period
-    elapses."""
+    """Test that idle impalads shut down in a timely manner after the shutdown grace
+    period elapses."""
     impalad1 = psutil.Process(self.cluster.impalads[0].get_pid())
     impalad2 = psutil.Process(self.cluster.impalads[1].get_pid())
     impalad3 = psutil.Process(self.cluster.impalads[2].get_pid())
@@ -158,7 +160,7 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
         ":shutdown('{0}:27000')".format(socket.gethostname()),
         query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'})
 
-    # Make sure that the impala daemons exit after the startup grace period plus a 10
+    # Make sure that the impala daemons exit after the shutdown grace period plus a 10
     # second margin of error.
     start_time = time.time()
     LOG.info("Waiting for impalads to exit {0}".format(start_time))
@@ -218,11 +220,11 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     SHUTDOWN_EXEC2 = ": shutdown('localhost:27001')"
 
     # Run this query before shutdown and make sure that it executes successfully on
-    # all executors through the startup grace period without disruption.
+    # all executors through the shutdown grace period without disruption.
     before_shutdown_handle = self.__exec_and_wait_until_running(QUERY)
 
     # Run this query which simulates getting stuck in admission control until after
-    # the startup grace period expires. This exercises the code path where the
+    # the shutdown grace period expires. This exercises the code path where the
     # coordinator terminates the query before it has started up.
     before_shutdown_admission_handle = self.execute_query_async(QUERY,
         {'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'})
@@ -423,3 +425,48 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
       assert False, "Expected query to fail"
     except Exception, e:
       assert 'Failed due to unreachable impalad(s)' in str(e)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--shutdown_grace_period_s={grace_period} \
+          --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S,
+            hostname=socket.gethostname()), cluster_size=1)
+  def test_shutdown_signal(self):
+    """Test that an idle impalad shuts down in a timely manner after the shutdown grace
+    period elapses."""
+    impalad = psutil.Process(self.cluster.impalads[0].get_pid())
+    LOG.info(
+      "Sending IMPALA_SHUTDOWN_SIGNAL(SIGRTMIN = {0}) signal to impalad PID = {1}",
+      self.IMPALA_SHUTDOWN_SIGNAL, impalad.pid)
+    impalad.send_signal(self.IMPALA_SHUTDOWN_SIGNAL)
+    # Make sure that the impala daemon exits after the shutdown grace period plus a 10
+    # second margin of error.
+    start_time = time.time()
+    LOG.info("Waiting for impalad to exit {0}".format(start_time))
+    impalad.wait()
+    shutdown_duration = time.time() - start_time
+    assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10
+    # Make sure signal was received and the grace period and deadline are as expected.
+    self.assert_impalad_log_contains('INFO',
+      "Shutdown signal received. Current Shutdown Status: shutdown grace period left: "
+      "{0}s000ms, deadline left: 8760h".format(self.IDLE_SHUTDOWN_GRACE_PERIOD_S))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_sending_multiple_shutdown_signals(self):
+    """Test that multiple IMPALA_SHUTDOWN_SIGNAL signals are all handeled without
+    crashing the process."""
+    impalad = psutil.Process(self.cluster.impalads[0].get_pid())
+    NUM_SIGNALS_TO_SEND = 10
+    LOG.info(
+      "Sending {0} IMPALA_SHUTDOWN_SIGNAL(SIGRTMIN = {1}) signals to impalad PID = {2}",
+      NUM_SIGNALS_TO_SEND, self.IMPALA_SHUTDOWN_SIGNAL, impalad.pid)
+    for i in range(NUM_SIGNALS_TO_SEND):
+      impalad.send_signal(self.IMPALA_SHUTDOWN_SIGNAL)
+    # Give shutdown thread some time to wake up and handle all the signals to avoid
+    # flakiness.
+    sleep(5)
+    # Make sure all signals were received and the process is still up.
+    self.assert_impalad_log_contains('INFO', "Shutdown signal received.",
+                                     NUM_SIGNALS_TO_SEND)
+    assert impalad.is_running(), "Impalad process should still be running."