You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2013/06/26 21:03:48 UTC

svn commit: r1497044 - in /incubator/ambari/branches/branch-1.2.5/ambari-agent/src: main/python/ambari_agent/ test/python/

Author: mahadev
Date: Wed Jun 26 19:03:47 2013
New Revision: 1497044

URL: http://svn.apache.org/r1497044
Log:
AMBARI-2486. shell.killprocessgrp is not working in a reliable way (Dmitry L via mahadev)

Modified:
    incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
    incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
    incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py
    incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py
    incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py
    incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py

Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py Wed Jun 26 19:03:47 2013
@@ -223,12 +223,10 @@ class PuppetExecutor:
     self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
     if puppet.returncode is None:
       logger.error("Task timed out and will be killed")
-      self.runShellKillPgrp(puppet)
+      shell.kill_process_with_children(puppet.pid)
       self.last_puppet_has_been_killed = True
     pass
 
-  def runShellKillPgrp(self, puppet):
-    shell.killprocessgrp(puppet.pid)
 
 def main():
   logging.basicConfig(level=logging.DEBUG)    

Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py Wed Jun 26 19:03:47 2013
@@ -104,9 +104,6 @@ class PythonExecutor:
     self.event.wait(self.PYTHON_TIMEOUT_SECONDS)
     if python.returncode is None:
       logger.error("Subprocess timed out and will be killed")
-      self.runShellKillPgrp(python)
+      shell.kill_process_with_children(python.pid)
       self.python_process_has_been_killed = True
     pass
-
-  def runShellKillPgrp(self, python):
-    shell.killprocessgrp(python.pid)
\ No newline at end of file

Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/main/python/ambari_agent/shell.py Wed Jun 26 19:03:47 2013
@@ -28,6 +28,7 @@ import threading
 import time
 import traceback
 import AmbariConfig
+import pprint
 
 try:
     import pwd
@@ -39,7 +40,7 @@ serverTracker = {}
 logger = logging.getLogger()
 
 threadLocal = threading.local()
-
+gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL
 tempFiles = [] 
 def noteTempFile(filename):
   tempFiles.append(filename)
@@ -54,21 +55,43 @@ def killstaleprocesses():
   for file in files:
     if str(file).endswith(".pid"):
       pid = str(file).split('.')[0]
-      killprocessgrp(int(pid))
+      kill_process_with_children(int(pid))
       os.unlink(os.path.join(prefix,file))
   logger.info ("Killed stale processes")
 
+
 def killprocessgrp(pid):
+  run_kill_function(os.killpg, pid)
+
+def kill_process_with_children(parent_pid):
+  def kill_tree_function(pid, signal):
+    '''
+    Kills process tree starting from a given pid.
+    '''
+    # The command below starts 'ps' linux utility and then parses it's
+    # output using 'awk'. AWK recursively extracts PIDs of all children of
+    # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh'
+    # shell.
+    CMD = """ps xf | awk -v PID=""" + str(pid) + \
+        """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
+        """K=P } P && !/_/ { P="" }  END { print "kill -""" \
+        + str(signal) + """ "K }' | sh """
+    process = subprocess.Popen(CMD, stdout=subprocess.PIPE,
+                               stderr=subprocess.PIPE, shell=True)
+    process.communicate()
+  run_kill_function(kill_tree_function, parent_pid)
+
+def run_kill_function(kill_function, pid):
   try:
-    os.killpg(pid, signal.SIGTERM)
+    kill_function(pid, signal.SIGTERM)
   except Exception, e:
     logger.warn("Failed to kill PID %d" % (pid))
     logger.warn("Reported error: " + repr(e))
 
-  time.sleep(5)
+  time.sleep(gracefull_kill_delay)
 
   try:
-    os.killpg(pid, signal.SIGKILL)
+    kill_function(pid, signal.SIGKILL)
   except Exception, e:
     logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
     logger.error("Reported error: " + repr(e))

Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPuppetExecutor.py Wed Jun 26 19:03:47 2013
@@ -162,7 +162,8 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(result_check, True, "Failed to condence output log")
     self.assertEquals(len(result.splitlines(True)), 2, "Failed to condence output log")
 
-  def test_watchdog_1(self):
+  @patch("shell.kill_process_with_children")
+  def test_watchdog_1(self, kill_process_with_children_mock):
     """
     Tests whether watchdog works
     """
@@ -176,6 +177,7 @@ class TestPuppetExecutor(TestCase):
     result = {  }
     puppetEnv = { "RUBYLIB" : ""}
     executor_mock.PUPPET_TIMEOUT_SECONDS = 0.1
+    kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
     subproc_mock.returncode = None
     thread = Thread(target =  executor_mock.runPuppetFile, args = ("fake_puppetFile", result, puppetEnv, tmpoutfile, tmperrfile))
     thread.start()

Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestPythonExecutor.py Wed Jun 26 19:03:47 2013
@@ -28,13 +28,13 @@ from threading import Thread
 
 from PythonExecutor import PythonExecutor
 from AmbariConfig import AmbariConfig
-from mock.mock import MagicMock
+from mock.mock import MagicMock, patch
 
 
 class TestPythonExecutor(TestCase):
 
-
-  def test_watchdog_1(self):
+  @patch("shell.kill_process_with_children")
+  def test_watchdog_1(self, kill_process_with_children_mock):
     """
     Tests whether watchdog works
     """
@@ -43,6 +43,7 @@ class TestPythonExecutor(TestCase):
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
     executor.PYTHON_TIMEOUT_SECONDS = 0.1
+    kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
 
     def lauch_python_subprocess_method(command, tmpout, tmperr):
       subproc_mock.tmpout = tmpout

Modified: incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py?rev=1497044&r1=1497043&r2=1497044&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py (original)
+++ incubator/ambari/branches/branch-1.2.5/ambari-agent/src/test/python/TestShell.py Wed Jun 26 19:03:47 2013
@@ -26,8 +26,8 @@ from mock.mock import patch, MagicMock, 
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent import shell
 from shell import shellRunner
-
-
+from sys import platform as _platform
+import subprocess, time
 
 class TestShell(unittest.TestCase):
 
@@ -60,3 +60,35 @@ class TestShell(unittest.TestCase):
     result = sh.run(['echo'], 'non_exist_user_name')
     self.assertEquals(result['exitCode'], 0)
     self.assertEquals(result['error'], '')
+
+  def test_kill_process_with_children(self):
+    if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
+      gracefull_kill_delay_old = shell.gracefull_kill_delay
+      shell.gracefull_kill_delay = 0.1
+      sleep_cmd = "sleep 314159265"
+      test_cmd = """ (({0}) | ({0} | {0})) """.format(sleep_cmd)
+      # Starting process tree (multiple process groups)
+      test_process = subprocess.Popen(test_cmd, shell=True)
+      time.sleep(0.3) # Delay to allow subprocess to start
+      # Check if processes are running
+      ps_cmd = """ps aux | grep "{0}" | grep -v grep """.format(sleep_cmd)
+      ps_process = subprocess.Popen(ps_cmd, stdout=subprocess.PIPE, shell=True)
+      (out, err) = ps_process.communicate()
+      self.assertTrue(sleep_cmd in out)
+      # Kill test process
+      shell.kill_process_with_children(test_process.pid)
+      test_process.communicate()
+      # Now test process should not be running
+      ps_process = subprocess.Popen(ps_cmd, stdout=subprocess.PIPE, shell=True)
+      (out, err) = ps_process.communicate()
+      self.assertFalse(sleep_cmd in out)
+      shell.gracefull_kill_delay = gracefull_kill_delay_old
+    else:
+      # Do not run under other systems
+      pass
+
+
+  @patch("os.killpg")
+  def test_killprocessgrp(self, killpg_grp):
+    shell.killprocessgrp(999)
+    self.assertTrue(killpg_grp.called)
\ No newline at end of file