You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/19 15:16:29 UTC
[impala] 02/02: IMPALA-8401: SIGRTMIN initiates the graceful
shutdown process
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1a800457a13c3407a33e796cef0c328b2d90c52e
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Mon Apr 8 16:59:40 2019 -0700
IMPALA-8401: SIGRTMIN initiates the graceful shutdown process
This patch enables a user that has access to the impalad process,
to initiate the graceful shutdown process with a deadline of one year
by sending SIGRTMIN signal to it.
Sample usage: "kill -SIGRTMIN <IMPALAD_PID>"
Testing:
Added relevant e2e tests.
Tested on CentOS 6, CentOS 7, Ubuntu 16.04, Ubuntu 18.04 and SLES 12
Change-Id: I521ffd7526ac9a8a5c4996994eb68d6a855aef86
Reviewed-on: http://gerrit.cloudera.org:8080/12973
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/common/init.cc | 56 +++++++++++++++++++++++
be/src/common/init.h | 10 +++++
be/src/service/impala-server.cc | 2 +-
be/src/service/impalad-main.cc | 2 +-
tests/custom_cluster/test_restart_services.py | 65 +++++++++++++++++++++++----
5 files changed, 124 insertions(+), 11 deletions(-)
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 2b8dbd4..feccbe0 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -38,6 +38,7 @@
#include "runtime/hdfs-fs-cache.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-tracker.h"
+#include "service/impala-server.h"
#include "util/cgroup-util.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
@@ -112,6 +113,10 @@ static unique_ptr<impala::Thread> log_maintenance_thread;
// 2) Frees excess memory that TCMalloc has left in its pageheap.
static unique_ptr<impala::Thread> memory_maintenance_thread;
+// Shutdown signal handler thread that calls sigwait() on IMPALA_SHUTDOWN_SIGNAL and
+// initiates a graceful shutdown with a virtually unlimited deadline (one year).
+static unique_ptr<impala::Thread> shutdown_signal_handler_thread;
+
// A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps
// for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual
// time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged.
@@ -177,6 +182,31 @@ extern "C" { void __gcov_flush(); }
}
}
+[[noreturn]] static void ImpalaShutdownSignalHandler() {
+ sigset_t signals;
+ CHECK_EQ(0, sigemptyset(&signals));
+ CHECK_EQ(0, sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL));
+ DCHECK(ExecEnv::GetInstance() != nullptr);
+ DCHECK(ExecEnv::GetInstance()->impala_server() != nullptr);
+ ImpalaServer* impala_server = ExecEnv::GetInstance()->impala_server();
+ while (true) {
+ int signal;
+ int err = sigwait(&signals, &signal);
+ CHECK(err == 0) << "sigwait(): " << GetStrErrMsg(err) << ": " << err;
+ CHECK_EQ(IMPALA_SHUTDOWN_SIGNAL, signal);
+ ShutdownStatusPB shutdown_status;
+ const int ONE_YEAR_IN_SECONDS = 365 * 24 * 60 * 60;
+ Status status = impala_server->StartShutdown(ONE_YEAR_IN_SECONDS, &shutdown_status);
+ if (!status.ok()) {
+ LOG(ERROR) << "Shutdown signal received but unable to initiate shutdown. Status: "
+ << status.GetDetail();
+ continue;
+ }
+ LOG(INFO) << "Shutdown signal received. Current Shutdown Status: "
+ << ImpalaServer::ShutdownStatusToString(shutdown_status);
+ }
+}
+
static void PauseMonitorLoop() {
if (FLAGS_pause_monitor_warn_threshold_ms <= 0) return;
int64_t time_before_sleep = MonotonicMillis();
@@ -207,9 +237,30 @@ static void PauseMonitorLoop() {
_exit(0);
}
+// Helper method that checks the return value of a syscall passed through
+// 'syscall_ret_val'. If it indicates an error, it writes an error message to stderr along
+// with the error string fetched via errno and calls exit().
+void AbortIfError(const int syscall_ret_val, const string& msg) {
+ if (syscall_ret_val == 0) return;
+ cerr << Substitute("$0 Error: $1", msg, GetStrErrMsg());
+ exit(1);
+}
+
+// Blocks the IMPALA_SHUTDOWN_SIGNAL signal. Should be called by the process before
+// spawning any other threads to make sure it gets blocked in all threads and will only be
+// caught by the thread waiting on it.
+void BlockImpalaShutdownSignal() {
+ const string error_msg = "Failed to block IMPALA_SHUTDOWN_SIGNAL for all threads.";
+ sigset_t signals;
+ AbortIfError(sigemptyset(&signals), error_msg);
+ AbortIfError(sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL), error_msg);
+ AbortIfError(pthread_sigmask(SIG_BLOCK, &signals, nullptr), error_msg);
+}
+
void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
TestInfo::Mode test_mode) {
srand(time(NULL));
+ BlockImpalaShutdownSignal();
CpuInfo::Init();
DiskInfo::Init();
@@ -351,6 +402,11 @@ Status impala::StartMemoryMaintenanceThread() {
&MemoryMaintenanceThread, &memory_maintenance_thread);
}
+Status impala::StartImpalaShutdownSignalHandlerThread() {
+ return Thread::Create("common", "shutdown-signal-handler", &ImpalaShutdownSignalHandler,
+ &shutdown_signal_handler_thread);
+}
+
#if defined(ADDRESS_SANITIZER)
// Default ASAN_OPTIONS. Override by setting environment variable $ASAN_OPTIONS.
extern "C" const char *__asan_default_options() {
diff --git a/be/src/common/init.h b/be/src/common/init.h
index fe8a063..ff0194e 100644
--- a/be/src/common/init.h
+++ b/be/src/common/init.h
@@ -21,6 +21,10 @@
#include "util/test-info.h"
#include "common/status.h"
+// Using the first real-time signal available to initiate graceful shutdown.
+// See "Real-time signals" section under signal(7)'s man page for more info.
+#define IMPALA_SHUTDOWN_SIGNAL SIGRTMIN
+
namespace impala {
/// Initialises logging, flags, and, if init_jvm is true, an embedded JVM.
@@ -35,6 +39,12 @@ void InitCommonRuntime(int argc, char** argv, bool init_jvm,
/// RegisterMemoryMetrics(). This thread is needed for daemons to free memory and
/// refresh metrics but is not needed for standalone tests.
Status StartMemoryMaintenanceThread() WARN_UNUSED_RESULT;
+
+/// Starts Impala shutdown signal handler thread. This thread is responsible for
+/// synchronously handling the IMPALA_SHUTDOWN_SIGNAL signal and initiating graceful
+/// shutdown when it is received. Must be called only after IMPALA_SHUTDOWN_SIGNAL is
+/// blocked on all threads in the process and impala server has started.
+Status StartImpalaShutdownSignalHandlerThread() WARN_UNUSED_RESULT;
}
#endif
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 5434282..df09d21 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2456,7 +2456,7 @@ ShutdownStatusPB ImpalaServer::GetShutdownStatus() const {
}
string ImpalaServer::ShutdownStatusToString(const ShutdownStatusPB& shutdown_status) {
- return Substitute("startup grace period left: $0, deadline left: $1, "
+ return Substitute("shutdown grace period left: $0, deadline left: $1, "
"queries registered on coordinator: $2, queries executing: $3, "
"fragment instances: $4",
PrettyPrinter::Print(shutdown_status.grace_remaining_ms(), TUnit::TIME_MS),
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 2b7cc95..7650126 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -92,7 +92,7 @@ int ImpaladMain(int argc, char** argv) {
ShutdownLogging();
exit(1);
}
-
+ ABORT_IF_ERROR(StartImpalaShutdownSignalHandlerThread());
impala_server->Join();
return 0;
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index e1e19ba..ca6ffaf 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -19,6 +19,7 @@ import logging
import pytest
import psutil
import re
+import signal
import socket
import time
@@ -93,15 +94,16 @@ def parse_shutdown_result(result):
deadline left, queries registered, queries executing)."""
assert len(result.data) == 1
summary = result.data[0]
- match = re.match(r'startup grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), ' +
- r'queries registered on coordinator: ([0-9]*), queries executing: ([0-9]*), ' +
- r'fragment instances: [0-9]*', summary)
+ match = re.match(r'shutdown grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), '
+ r'queries registered on coordinator: ([0-9]*), queries executing: '
+ r'([0-9]*), fragment instances: [0-9]*', summary)
assert match is not None, summary
return match.groups()
-class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
+class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
IDLE_SHUTDOWN_GRACE_PERIOD_S = 1
+ IMPALA_SHUTDOWN_SIGNAL = signal.SIGRTMIN
@classmethod
def get_workload(cls):
@@ -113,8 +115,8 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
--hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S,
hostname=socket.gethostname()))
def test_shutdown_idle(self):
- """Test that idle impalads shut down in a timely manner after the startup grace period
- elapses."""
+ """Test that idle impalads shut down in a timely manner after the shutdown grace
+ period elapses."""
impalad1 = psutil.Process(self.cluster.impalads[0].get_pid())
impalad2 = psutil.Process(self.cluster.impalads[1].get_pid())
impalad3 = psutil.Process(self.cluster.impalads[2].get_pid())
@@ -158,7 +160,7 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
":shutdown('{0}:27000')".format(socket.gethostname()),
query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'})
- # Make sure that the impala daemons exit after the startup grace period plus a 10
+ # Make sure that the impala daemons exit after the shutdown grace period plus a 10
# second margin of error.
start_time = time.time()
LOG.info("Waiting for impalads to exit {0}".format(start_time))
@@ -218,11 +220,11 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
SHUTDOWN_EXEC2 = ": shutdown('localhost:27001')"
# Run this query before shutdown and make sure that it executes successfully on
- # all executors through the startup grace period without disruption.
+ # all executors through the shutdown grace period without disruption.
before_shutdown_handle = self.__exec_and_wait_until_running(QUERY)
# Run this query which simulates getting stuck in admission control until after
- # the startup grace period expires. This exercises the code path where the
+ # the shutdown grace period expires. This exercises the code path where the
# coordinator terminates the query before it has started up.
before_shutdown_admission_handle = self.execute_query_async(QUERY,
{'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'})
@@ -423,3 +425,48 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
assert False, "Expected query to fail"
except Exception, e:
assert 'Failed due to unreachable impalad(s)' in str(e)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--shutdown_grace_period_s={grace_period} \
+ --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S,
+ hostname=socket.gethostname()), cluster_size=1)
+ def test_shutdown_signal(self):
+ """Test that an idle impalad shuts down in a timely manner after the shutdown grace
+ period elapses."""
+ impalad = psutil.Process(self.cluster.impalads[0].get_pid())
+ LOG.info(
+ "Sending IMPALA_SHUTDOWN_SIGNAL(SIGRTMIN = {0}) signal to impalad PID = {1}",
+ self.IMPALA_SHUTDOWN_SIGNAL, impalad.pid)
+ impalad.send_signal(self.IMPALA_SHUTDOWN_SIGNAL)
+ # Make sure that the impala daemon exits after the shutdown grace period plus a 10
+ # second margin of error.
+ start_time = time.time()
+ LOG.info("Waiting for impalad to exit {0}".format(start_time))
+ impalad.wait()
+ shutdown_duration = time.time() - start_time
+ assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10
+ # Make sure signal was received and the grace period and deadline are as expected.
+ self.assert_impalad_log_contains('INFO',
+ "Shutdown signal received. Current Shutdown Status: shutdown grace period left: "
+ "{0}s000ms, deadline left: 8760h".format(self.IDLE_SHUTDOWN_GRACE_PERIOD_S))
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_sending_multiple_shutdown_signals(self):
+ """Test that multiple IMPALA_SHUTDOWN_SIGNAL signals are all handeled without
+ crashing the process."""
+ impalad = psutil.Process(self.cluster.impalads[0].get_pid())
+ NUM_SIGNALS_TO_SEND = 10
+ LOG.info(
+ "Sending {0} IMPALA_SHUTDOWN_SIGNAL(SIGRTMIN = {1}) signals to impalad PID = {2}",
+ NUM_SIGNALS_TO_SEND, self.IMPALA_SHUTDOWN_SIGNAL, impalad.pid)
+ for i in range(NUM_SIGNALS_TO_SEND):
+ impalad.send_signal(self.IMPALA_SHUTDOWN_SIGNAL)
+ # Give shutdown thread some time to wake up and handle all the signals to avoid
+ # flakiness.
+ sleep(5)
+ # Make sure all signals were received and the process is still up.
+ self.assert_impalad_log_contains('INFO', "Shutdown signal received.",
+ NUM_SIGNALS_TO_SEND)
+ assert impalad.is_running(), "Impalad process should still be running."