You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/09/14 18:32:24 UTC
[04/35] ambari git commit: AMBARI-18369. Make Execute timeout to be
able to kill process trees which doesn't respond to SIGTERM (aonishuk)
AMBARI-18369. Make Execute timeout to be able to kill process trees which doesn't respond to SIGTERM (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d8f3cf88
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d8f3cf88
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d8f3cf88
Branch: refs/heads/branch-dev-patch-upgrade
Commit: d8f3cf88f5c03e844c43af85b5eee69a1956eecb
Parents: 2dd5ba3
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Sep 13 10:31:20 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Sep 13 10:31:20 2016 +0300
----------------------------------------------------------------------
.../python/resource_management/core/__init__.py | 1 +
.../resource_management/core/files/killtree.sh | 40 +++++++++
.../core/providers/system.py | 1 +
.../core/resources/system.py | 13 ++-
.../python/resource_management/core/shell.py | 18 ++--
.../resource_management/core/signal_utils.py | 91 ++++++++++++++++++++
.../python/resource_management/core/utils.py | 28 +-----
.../package/alerts/alert_hive_metastore.py | 5 +-
8 files changed, 159 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/__init__.py b/ambari-common/src/main/python/resource_management/core/__init__.py
index 1af793b..3a63dab 100644
--- a/ambari-common/src/main/python/resource_management/core/__init__.py
+++ b/ambari-common/src/main/python/resource_management/core/__init__.py
@@ -29,5 +29,6 @@ from resource_management.core.source import *
from resource_management.core.system import *
from resource_management.core.shell import *
from resource_management.core.logger import *
+from resource_management.core.signal_utils import *
__version__ = "0.4.1"
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/files/killtree.sh
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/files/killtree.sh b/ambari-common/src/main/python/resource_management/core/files/killtree.sh
new file mode 100644
index 0000000..c19efd9
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/files/killtree.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+set -e
+
+killtree() {
+ local _pid=$1
+ local _sig=${2:--TERM}
+ ambari-sudo.sh kill -stop ${_pid} # needed to stop quickly forking parent from producing children between child killing and parent killing
+ for _child in $(ps -o pid --no-headers --ppid ${_pid}); do
+ killtree ${_child} ${_sig}
+ done
+ ambari-sudo.sh kill -${_sig} ${_pid}
+}
+
+if [ $# -eq 0 -o $# -gt 2 ]; then
+ echo "Usage: $(basename $0) <pid> [signal]"
+ exit 1
+fi
+
+killtree $@
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/providers/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
index fcbab01..2b8d5f7 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/system.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -256,6 +256,7 @@ class ExecuteProvider(Provider):
timeout=self.resource.timeout,on_timeout=self.resource.on_timeout,
path=self.resource.path,
sudo=self.resource.sudo,
+ timeout_kill_strategy=self.resource.timeout_kill_strategy,
on_new_line=self.resource.on_new_line,
stdout=self.resource.stdout,stderr=self.resource.stderr,
tries=self.resource.tries, try_sleep=self.resource.try_sleep)
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index 7f164f6..087ceab 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -23,9 +23,9 @@ Ambari Agent
__all__ = ["File", "Directory", "Link", "Execute", "ExecuteScript", "Mount"]
import subprocess
+from resource_management.core.signal_utils import TerminateStrategy
from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument
-
class File(Resource):
action = ForcedListArgument(default="create")
path = ResourceArgument(default=lambda obj: obj.name)
@@ -240,6 +240,17 @@ class Execute(Resource):
stdout = ResourceArgument(default=subprocess.PIPE)
stderr = ResourceArgument(default=subprocess.STDOUT)
+ """
+ This argument takes TerminateStrategy constants. Import it as shown below:
+ from resource_management.core.signal_utils import TerminateStrategy
+
+ Possible values are:
+ TerminateStrategy.TERMINATE_PARENT - kill parent process with SIGTERM (is perfect if all children handle SIGTERM signal)
+ TerminateStrategy.KILL_PROCESS_GROUP - kill process GROUP with SIGTERM and if not effective with SIGKILL
+ TerminateStrategy.KILL_PROCESS_TREE - send SIGTERM to every process in the tree
+ """
+ timeout_kill_strategy = ResourceArgument(default=TerminateStrategy.TERMINATE_PARENT)
+
class ExecuteScript(Resource):
action = ForcedListArgument(default="run")
code = ResourceArgument(required=True)
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/shell.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
index 6d9eb18..372755a 100644
--- a/ambari-common/src/main/python/resource_management/core/shell.py
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -37,6 +37,7 @@ from exceptions import ExecuteTimeoutException
from resource_management.core.logger import Logger
from resource_management.core import utils
from ambari_commons.constants import AMBARI_SUDO_BINARY
+from resource_management.core.signal_utils import TerminateStrategy, terminate_process
# use quiet=True calls from this folder (logs get too messy duplicating the resources with its commands)
NOT_LOGGED_FOLDER = 'resource_management/core'
@@ -90,7 +91,7 @@ def preexec_fn():
@log_function_call
def checked_call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
- path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT):
"""
Execute the shell command and throw an exception on failure.
@throws Fail
@@ -99,12 +100,12 @@ def checked_call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,st
return _call_wrapper(command, logoutput=logoutput, throw_on_failure=True, stdout=stdout, stderr=stderr,
cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish,
on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
- tries=tries, try_sleep=try_sleep)
+ tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy)
@log_function_call
def call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
- path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT):
"""
Execute the shell command despite failures.
@return: return_code, output
@@ -112,7 +113,7 @@ def call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=sub
return _call_wrapper(command, logoutput=logoutput, throw_on_failure=False, stdout=stdout, stderr=stderr,
cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish,
on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
- tries=tries, try_sleep=try_sleep)
+ tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy)
@log_function_call
def non_blocking_call(command, quiet=False, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
@@ -166,7 +167,7 @@ def _call_wrapper(command, **kwargs):
def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
- path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT):
"""
Execute shell command
@@ -224,7 +225,7 @@ def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess.PIPE
if timeout:
timeout_event = threading.Event()
- t = threading.Timer( timeout, _on_timeout, [proc, timeout_event] )
+ t = threading.Timer( timeout, _on_timeout, [proc, timeout_event, timeout_kill_strategy] )
t.start()
if not wait_for_finish:
@@ -378,7 +379,6 @@ def _print(line):
sys.stdout.write(line)
sys.stdout.flush()
-def _on_timeout(proc, timeout_event):
+def _on_timeout(proc, timeout_event, terminate_strategy):
timeout_event.set()
- utils.killpg_gracefully(proc)
-
+ terminate_process(proc, terminate_strategy)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/signal_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/signal_utils.py b/ambari-common/src/main/python/resource_management/core/signal_utils.py
new file mode 100644
index 0000000..1f0dfe7
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/core/signal_utils.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["TerminateStrategy", "terminate_process"]
+
+import os
+import signal
+from resource_management.core.base import Fail
+
+GRACEFUL_PG_KILL_TIMEOUT_SECONDS = 5
+
+class TerminateStrategy:
+ """
+ 0 - kill parent process with SIGTERM (is perfect if all children handle SIGTERM signal). Otherwise children will survive.
+ 1 - kill process GROUP with SIGTERM and if not effective with SIGKILL
+ 2 - send SIGTERM to every process in the tree
+ """
+ TERMINATE_PARENT = 0
+ KILL_PROCESS_GROUP = 1
+ KILL_PROCESS_TREE = 2
+
+def terminate_process(proc, terminate_strategy):
+ if terminate_strategy == TerminateStrategy.TERMINATE_PARENT:
+ terminate_parent_process(proc)
+ elif terminate_strategy == TerminateStrategy.KILL_PROCESS_GROUP:
+ killpg_gracefully(proc)
+ elif terminate_strategy == TerminateStrategy.KILL_PROCESS_TREE:
+ kill_process_tree(proc)
+ else:
+ raise Fail("Invalid timeout_kill_strategy = '{0}'. Use TerminateStrategy class constants as a value.".format(terminate_strategy))
+
+def killpg_gracefully(proc, timeout=GRACEFUL_PG_KILL_TIMEOUT_SECONDS):
+ """
+ Tries to kill pgroup (process group) of process with SIGTERM.
+ If the process is still alive after waiting for timeout, SIGKILL is sent to the pgroup.
+ """
+ from resource_management.core import sudo
+ from resource_management.core.logger import Logger
+
+ if proc.poll() == None:
+ try:
+ pgid = os.getpgid(proc.pid)
+ sudo.kill(-pgid, signal.SIGTERM)
+
+ for i in xrange(10*timeout):
+ if proc.poll() is not None:
+ break
+ time.sleep(0.1)
+ else:
+ Logger.info("Cannot gracefully kill process group {0}. Resorting to SIGKILL.".format(pgid))
+ sudo.kill(-pgid, signal.SIGKILL)
+ proc.wait()
+ # catch race condition if proc already dead
+ except OSError:
+ pass
+
+def terminate_parent_process(proc):
+ if proc.poll() == None:
+ try:
+ proc.terminate()
+ proc.wait()
+ # catch race condition if proc already dead
+ except OSError:
+ pass
+
+def kill_process_tree(proc):
+ from resource_management.core import shell
+ current_directory = os.path.dirname(os.path.abspath(__file__))
+ kill_tree_script = "{0}/files/killtree.sh".format(current_directory)
+ if proc.poll() == None:
+ shell.checked_call(["bash", kill_tree_script, str(proc.pid), str(signal.SIGKILL)])
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/utils.py b/ambari-common/src/main/python/resource_management/core/utils.py
index dc771d1..265b2f2 100644
--- a/ambari-common/src/main/python/resource_management/core/utils.py
+++ b/ambari-common/src/main/python/resource_management/core/utils.py
@@ -31,7 +31,6 @@ from resource_management.core.exceptions import Fail
from itertools import chain, repeat, islice
PASSWORDS_HIDE_STRING = "[PROTECTED]"
-GRACEFUL_PG_KILL_TIMEOUT_SECONDS = 5
class AttributeDictionary(object):
def __init__(self, *args, **kwargs):
@@ -158,29 +157,4 @@ def pad_infinite(iterable, padding=None):
return chain(iterable, repeat(padding))
def pad(iterable, size, padding=None):
- return islice(pad_infinite(iterable, padding), size)
-
-def killpg_gracefully(proc, timeout=GRACEFUL_PG_KILL_TIMEOUT_SECONDS):
- """
- Tries to kill pgroup (process group) of process with SIGTERM.
- If the process is still alive after waiting for timeout, SIGKILL is sent to the pgroup.
- """
- from resource_management.core import sudo
- from resource_management.core.logger import Logger
-
- if proc.poll() == None:
- try:
- pgid = os.getpgid(proc.pid)
- sudo.kill(-pgid, signal.SIGTERM)
-
- for i in xrange(10*timeout):
- if proc.poll() is not None:
- break
- time.sleep(0.1)
- else:
- Logger.info("Cannot gracefully kill process group {0}. Resorting to SIGKILL.".format(pgid))
- sudo.kill(-pgid, signal.SIGKILL)
- proc.wait()
- # catch race condition if proc already dead
- except OSError:
- pass
\ No newline at end of file
+ return islice(pad_infinite(iterable, padding), size)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py
index e02ed5a..cd1eded 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py
@@ -28,6 +28,7 @@ from resource_management.core import global_lock
from resource_management.libraries.functions import format
from resource_management.libraries.functions import get_kinit_path
from resource_management.core.resources import Execute
+from resource_management.core.signal_utils import TerminateStrategy
from ambari_commons.os_check import OSConst
from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
@@ -195,7 +196,9 @@ def execute(configurations={}, parameters={}, host_name=None):
try:
Execute(cmd, user=smokeuser,
path=["/bin/", "/usr/bin/", "/usr/sbin/", bin_dir],
- timeout=int(check_command_timeout) )
+ timeout=5,
+ timeout_kill_strategy=TerminateStrategy.KILL_PROCESS_TREE,
+ )
total_time = time.time() - start_time