You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2013/06/26 21:03:48 UTC
svn commit: r1497044 - in
/incubator/ambari/branches/branch-1.2.5/ambari-agent/src:
main/python/ambari_agent/ test/python/
Author: mahadev
Date: Wed Jun 26 19:03:47 2013
New Revision: 1497044
URL: http://svn.apache.org/r1497044
Log:
AMBARI-2486. shell.killprocessgrp is not working in a reliable way (Dmitry L via mahadev)
Modified:
incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py
incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py
incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py
incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py
Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py Wed Jun 26 19:03:47 2013
@@ -223,12 +223,10 @@ class PuppetExecutor:
self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
if puppet.returncode is None:
logger.error("Task timed out and will be killed")
- self.runShellKillPgrp(puppet)
+ shell.kill_process_with_children(puppet.pid)
self.last_puppet_has_been_killed = True
pass
- def runShellKillPgrp(self, puppet):
- shell.killprocessgrp(puppet.pid)
def main():
logging.basicConfig(level=logging.DEBUG)
Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py Wed Jun 26 19:03:47 2013
@@ -104,9 +104,6 @@ class PythonExecutor:
self.event.wait(self.PYTHON_TIMEOUT_SECONDS)
if python.returncode is None:
logger.error("Subprocess timed out and will be killed")
- self.runShellKillPgrp(python)
+ shell.kill_process_with_children(python.pid)
self.python_process_has_been_killed = True
pass
-
- def runShellKillPgrp(self, python):
- shell.killprocessgrp(python.pid)
\ No newline at end of file
Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py Wed Jun 26 19:03:47 2013
@@ -28,6 +28,7 @@ import threading
import time
import traceback
import AmbariConfig
+import pprint
try:
import pwd
@@ -39,7 +40,7 @@ serverTracker = {}
logger = logging.getLogger()
threadLocal = threading.local()
-
+gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL
tempFiles = []
def noteTempFile(filename):
tempFiles.append(filename)
@@ -54,21 +55,43 @@ def killstaleprocesses():
for file in files:
if str(file).endswith(".pid"):
pid = str(file).split('.')[0]
- killprocessgrp(int(pid))
+ kill_process_with_children(int(pid))
os.unlink(os.path.join(prefix,file))
logger.info ("Killed stale processes")
+
def killprocessgrp(pid):
+ run_kill_function(os.killpg, pid)
+
+def kill_process_with_children(parent_pid):
+ def kill_tree_function(pid, signal):
+ '''
+ Kills process tree starting from a given pid.
+ '''
+ # The command below starts 'ps' linux utility and then parses it's
+ # output using 'awk'. AWK recursively extracts PIDs of all children of
+ # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh'
+ # shell.
+ CMD = """ps xf | awk -v PID=""" + str(pid) + \
+ """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
+ """K=P } P && !/_/ { P="" } END { print "kill -""" \
+ + str(signal) + """ "K }' | sh """
+ process = subprocess.Popen(CMD, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, shell=True)
+ process.communicate()
+ run_kill_function(kill_tree_function, parent_pid)
+
+def run_kill_function(kill_function, pid):
try:
- os.killpg(pid, signal.SIGTERM)
+ kill_function(pid, signal.SIGTERM)
except Exception, e:
logger.warn("Failed to kill PID %d" % (pid))
logger.warn("Reported error: " + repr(e))
- time.sleep(5)
+ time.sleep(gracefull_kill_delay)
try:
- os.killpg(pid, signal.SIGKILL)
+ kill_function(pid, signal.SIGKILL)
except Exception, e:
logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
logger.error("Reported error: " + repr(e))
Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py Wed Jun 26 19:03:47 2013
@@ -162,7 +162,8 @@ class TestPuppetExecutor(TestCase):
self.assertEquals(result_check, True, "Failed to condence output log")
self.assertEquals(len(result.splitlines(True)), 2, "Failed to condence output log")
- def test_watchdog_1(self):
+ @patch("shell.kill_process_with_children")
+ def test_watchdog_1(self, kill_process_with_children_mock):
"""
Tests whether watchdog works
"""
@@ -176,6 +177,7 @@ class TestPuppetExecutor(TestCase):
result = { }
puppetEnv = { "RUBYLIB" : ""}
executor_mock.PUPPET_TIMEOUT_SECONDS = 0.1
+ kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
subproc_mock.returncode = None
thread = Thread(target = executor_mock.runPuppetFile, args = ("fake_puppetFile", result, puppetEnv, tmpoutfile, tmperrfile))
thread.start()
Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py Wed Jun 26 19:03:47 2013
@@ -28,13 +28,13 @@ from threading import Thread
from PythonExecutor import PythonExecutor
from AmbariConfig import AmbariConfig
-from mock.mock import MagicMock
+from mock.mock import MagicMock, patch
class TestPythonExecutor(TestCase):
-
- def test_watchdog_1(self):
+ @patch("shell.kill_process_with_children")
+ def test_watchdog_1(self, kill_process_with_children_mock):
"""
Tests whether watchdog works
"""
@@ -43,6 +43,7 @@ class TestPythonExecutor(TestCase):
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
executor.PYTHON_TIMEOUT_SECONDS = 0.1
+ kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
def lauch_python_subprocess_method(command, tmpout, tmperr):
subproc_mock.tmpout = tmpout
Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py Wed Jun 26 19:03:47 2013
@@ -26,8 +26,8 @@ from mock.mock import patch, MagicMock,
from ambari_agent.AmbariConfig import AmbariConfig
from ambari_agent import shell
from shell import shellRunner
-
-
+from sys import platform as _platform
+import subprocess, time
class TestShell(unittest.TestCase):
@@ -60,3 +60,35 @@ class TestShell(unittest.TestCase):
result = sh.run(['echo'], 'non_exist_user_name')
self.assertEquals(result['exitCode'], 0)
self.assertEquals(result['error'], '')
+
+ def test_kill_process_with_children(self):
+ if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
+ gracefull_kill_delay_old = shell.gracefull_kill_delay
+ shell.gracefull_kill_delay = 0.1
+ sleep_cmd = "sleep 314159265"
+ test_cmd = """ (({0}) | ({0} | {0})) """.format(sleep_cmd)
+ # Starting process tree (multiple process groups)
+ test_process = subprocess.Popen(test_cmd, shell=True)
+ time.sleep(0.3) # Delay to allow subprocess to start
+ # Check if processes are running
+ ps_cmd = """ps aux | grep "{0}" | grep -v grep """.format(sleep_cmd)
+ ps_process = subprocess.Popen(ps_cmd, stdout=subprocess.PIPE, shell=True)
+ (out, err) = ps_process.communicate()
+ self.assertTrue(sleep_cmd in out)
+ # Kill test process
+ shell.kill_process_with_children(test_process.pid)
+ test_process.communicate()
+ # Now test process should not be running
+ ps_process = subprocess.Popen(ps_cmd, stdout=subprocess.PIPE, shell=True)
+ (out, err) = ps_process.communicate()
+ self.assertFalse(sleep_cmd in out)
+ shell.gracefull_kill_delay = gracefull_kill_delay_old
+ else:
+ # Do not run under other systems
+ pass
+
+
+ @patch("os.killpg")
+ def test_killprocessgrp(self, killpg_grp):
+ shell.killprocessgrp(999)
+ self.assertTrue(killpg_grp.called)
\ No newline at end of file