You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/31 18:06:27 UTC

svn commit: r1226136 - in /incubator/ambari/trunk: ./ agent/src/main/python/ambari_agent/ agent/src/test/python/

Author: ddas
Date: Sat Dec 31 17:06:27 2011
New Revision: 1226136

URL: http://svn.apache.org/viewvc?rev=1226136&view=rev
Log:
AMBARI-180. Fixes the agent to do better process management.

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
    incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Sat Dec 31 17:06:27 2011
@@ -2,7 +2,10 @@ Ambari Change log
 
 Release 0.1.0 - unreleased
 
-  AMBARI-179. Set the component level user/group information in the flattened stack, inherit default user/group information if not set one for component. (vgogate)
+  AMBARI-180. Fixes the agent to do better process management (ddas)
+
+  AMBARI-179. Set the component level user/group information in the flattened stack, 
+  inherit default user/group information if not set one for component. (vgogate)
 
   AMBARI-178. Add support for Map/Reduce component in Ambari stack (vgogate)
 

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Sat Dec 31 17:06:27 2011
@@ -49,8 +49,8 @@ class ActionQueue(threading.Thread):
     self.config = config
     self.sh = shellRunner()
     self._stop = threading.Event()
-    self.maxRetries = config.get('command', 'maxretries') 
-    self.sleepInterval = config.get('command', 'sleepBetweenRetries')
+    self.maxRetries = config.getint('command', 'maxretries') 
+    self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
 
   def stop(self):
     self._stop.set()
@@ -58,6 +58,10 @@ class ActionQueue(threading.Thread):
   def stopped(self):
     return self._stop.isSet()
 
+  #For unittest
+  def getshellinstance(self):
+    return self.sh
+
   def put(self, response):
     if 'actions' in response:
       actions = response['actions']
@@ -201,7 +205,7 @@ class ActionQueue(threading.Thread):
       # this is hardcoded to do puppet specific stuff for now
       # append the content of the puppet file to the file written above
       filepath = getFilePath(action,self.getInstallFilename(action['id'])) 
-      logger.debug("FILEPATH : " + filepath)
+      logger.info("File path for puppet top level script: " + filepath)
       p = self.sh.run(['/bin/cat',AmbariConfig.config.get('puppet','driver')])
       if p['exitCode']!=0:
         commandResult['error'] = p['error']

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py Sat Dec 31 17:06:27 2011
@@ -77,7 +77,7 @@ def writeFile(action, result, fileName="
       fullPathName=os.path.join(path, fileName)
     else:
       fullPathName=os.path.join(path, fileInfo['path'])
-    logger.info("path in writeFile: %s" % fullPathName)
+    logger.debug("path in writeFile: %s" % fullPathName)
     content=fileInfo['data']
     try:
       if isinstance(user, int)!=True:

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py Sat Dec 31 17:06:27 2011
@@ -29,6 +29,7 @@ import ConfigParser
 from createDaemon import createDaemon
 from Controller import Controller
 from shell import getTempFiles
+from shell import killstaleprocesses 
 import AmbariConfig
 
 logger = logging.getLogger()
@@ -58,11 +59,12 @@ def signal_handler(signum, frame):
 
   tempFiles = getTempFiles()
   for tempFile in tempFiles:
-    try:
-      os.unlink(tempFile)
-    except Exception:
-      traceback.print_exc()
-      logger.warn("Unable to remove: "+tempFile)
+    if os.path.exists(tempFile):
+      try:
+        os.unlink(tempFile)
+      except Exception:
+        traceback.print_exc()
+        logger.warn("Unable to remove: "+tempFile)
   os._exit(0)
 
 def debug(sig, frame):
@@ -109,7 +111,8 @@ def main():
     pid = str(os.getpid())
     file(pidfile, 'w').write(pid)
 
-  logger.setLevel(logging.DEBUG)
+
+  logger.setLevel(logging.INFO)
   formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
   rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 10)
   rotateLog.setFormatter(formatter)
@@ -127,6 +130,7 @@ def main():
   except Exception, err:
     logger.warn(err)
 
+  killstaleprocesses()
   logger.info("Connecting to controller at: "+config.get('controller', 'url'))
 
   # Launch Controller communication

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py Sat Dec 31 17:06:27 2011
@@ -29,6 +29,9 @@ import tempfile
 import signal
 import sys
 import threading
+import time
+import traceback
+import shutil
 
 global serverTracker
 serverTracker = {}
@@ -43,6 +46,34 @@ def noteTempFile(filename):
 def getTempFiles():
   return tempFiles
 
+def killstaleprocesses():
+  logger.info ("Killing stale processes")
+  prefix = AmbariConfig.config.get('stack','installprefix')
+  files = os.listdir(prefix)
+  for file in files:
+    if str(file).endswith(".pid"):
+      pid = str(file).split('.')[0]
+      killprocessgrp(int(pid))
+      os.unlink(os.path.join(prefix,file))
+  logger.info ("Killed stale processes")
+
+def killprocessgrp(pid):
+  try:
+    os.killpg(pid, signal.SIGTERM)
+    time.sleep(5)
+    try:
+      os.killpg(pid, signal.SIGKILL)
+    except:
+      logger.warn("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
+  except:
+    logger.warn("Failed to kill PID %d" % (pid))      
+
+def changeUid():
+  try:
+    os.setuid(threadLocal.uid)
+  except Exception:
+    logger.warn("can not switch user for running command.")
+
 class shellRunner:
   # Run any command
   def run(self, script, user=None):
@@ -57,7 +88,7 @@ class shellRunner:
     code = 0
     cmd = " "
     cmd = cmd.join(script)
-    p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+    p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
     out, err = p.communicate()
     code = p.wait()
     return {'exitCode': code, 'output': out, 'error': err}
@@ -65,6 +96,7 @@ class shellRunner:
   # dispatch action types
   def runAction(self, clusterId, component, role, user, command, cleanUpCommand, result):
     oldDir = os.getcwd()
+    #TODO: handle this better. Don't like that it is doing a chdir for the main process
     os.chdir(self.getWorkDir(clusterId, role))
     oldUid = os.getuid()
     try:
@@ -83,7 +115,7 @@ class shellRunner:
     tmp.close()
     cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param']))
     commandResult = {}
-    p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+    p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
     out, err = p.communicate()
     code = p.wait()
     if code != 0:
@@ -110,6 +142,7 @@ class shellRunner:
       cleanUpResult['exitCode'] = cleanUpCode
       result['cleanUpResult'] = cleanUpResult
       os.unlink(tempfilename)
+      os._exit(1)
     try:
       os.chdir(oldDir)
     except Exception:
@@ -120,7 +153,10 @@ class shellRunner:
   def startProcess(self, clusterId, clusterDefinitionRevision, component, role, script, user, result):
     global serverTracker
     oldDir = os.getcwd()
-    os.chdir(self.getWorkDir(clusterId,role))
+    try:
+      os.chdir(self.getWorkDir(clusterId,role))
+    except Exception:
+      logger.warn("%s %s %s can not switch dir for START_ACTION." % (clusterId, component, role))
     oldUid = os.getuid()
     try:
       if user is not None:
@@ -134,27 +170,16 @@ class shellRunner:
     commandResult = {}
     process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
     if not process in serverTracker:
-      tempfilename = tempfile.mktemp()
-      noteTempFile(tempfilename)  
-      child_pid = os.fork()
-      if child_pid == 0:
-        try:
-          self.changeUid() 
-          cmd = sys.executable
-          tmp = open(tempfilename, 'w')
-          tmp.write(script['script'])
-          tmp.close()
-          cmd = "%s %s %s" % (cmd, tempfilename, " ".join(script['param']))
-          p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
-          out, err = p.communicate()
-          code = p.wait()
-          os.unlink(tempfilename)
-          os._exit(1)
-        except:
-          os._exit(1)
-      else:
-        serverTracker[process] = child_pid
-        commandResult['exitCode'] = 0
+      try:
+        plauncher = processlauncher(script,user)
+        plauncher.start()
+        plauncher.blockUntilProcessCreation()
+      except Exception:
+        traceback.print_exc()
+        logger.warn("Can not launch process for %s %s %s" % (clusterId, component, role))
+        code = -1
+      serverTracker[process] = plauncher
+      commandResult['exitCode'] = code 
       result['commandResult'] = commandResult
     try:
       os.chdir(oldDir)
@@ -168,7 +193,8 @@ class shellRunner:
     keyFragments = processKey.split('/')
     process = self.getServerKey(keyFragments[0],keyFragments[1],keyFragments[2],keyFragments[3])
     if process in serverTracker:
-      os.kill(serverTracker[process], signal.SIGKILL)
+      logger.info ("Sending %s with PID %d the SIGTERM signal" % (process,serverTracker[process].getpid()))
+      killprocessgrp(serverTracker[process].getpid())
       del serverTracker[process]
 
   def getServerTracker(self):
@@ -181,8 +207,72 @@ class shellRunner:
     prefix = AmbariConfig.config.get('stack','installprefix')
     return str(os.path.join(prefix, clusterId, role))
 
-  def changeUid(self):
+
+class processlauncher(threading.Thread):
+  def __init__(self,script,uid):
+    threading.Thread.__init__(self)
+    self.script = script
+    self.serverpid = -1
+    self.uid = uid
+    self.out = None
+    self.err = None
+
+  def run(self):
     try:
-      os.setuid(threadLocal.uid)
-    except Exception:
-      logger.warn("can not switch user for running command.")
+      tempfilename = tempfile.mktemp()
+      noteTempFile(tempfilename)
+      pythoncmd = sys.executable
+      tmp = open(tempfilename, 'w')
+      tmp.write(self.script['script'])
+      tmp.close()
+      threadLocal.uid = self.uid
+      self.cmd = "%s %s %s" % (pythoncmd, tempfilename, " ".join(self.script['param']))
+      logger.info("Launching %s as uid %d" % (self.cmd,self.uid) )
+      p = subprocess.Popen(self.cmd, preexec_fn=self.changeUidAndSetSid, stdout=subprocess.PIPE, 
+                           stderr=subprocess.PIPE, shell=True, close_fds=True)
+      logger.info("Launched %s; PID %d" % (self.cmd,p.pid))
+      self.serverpid = p.pid
+      self.out, self.err = p.communicate()
+      self.code = p.wait()
+      logger.info("%s; PID %d exited with code %d \nSTDOUT: %s\nSTDERR %s" % 
+                 (self.cmd,p.pid,self.code,self.out,self.err))
+    except:
+      logger.warn("Exception encountered while launching : " + self.cmd)
+      traceback.print_exc()
+
+    os.unlink(self.getpidfile())
+    os.unlink(tempfilename)
+
+  def blockUntilProcessCreation(self):
+    self.getpid()
+ 
+  def getpid(self):
+    sleepCount = 1
+    while (self.serverpid == -1):
+      time.sleep(1)
+      logger.info("Waiting for process %s to start" % self.cmd)
+      if sleepCount > 10:
+        logger.warn("Couldn't start process %s even after %d seconds" % (self.cmd,sleepCount))
+        os._exit(1)
+    return self.serverpid
+
+  def getpidfile(self):
+    prefix = AmbariConfig.config.get('stack','installprefix')
+    pidfile = os.path.join(prefix,str(self.getpid())+".pid")
+    return pidfile
+ 
+  def changeUidAndSetSid(self):
+    prefix = AmbariConfig.config.get('stack','installprefix')
+    pidfile = os.path.join(prefix,str(os.getpid())+".pid")
+    #TODO remove try/except (when there is a way to provide
+    #config files for testcases). The default config will want
+    #to create files in /var/ambari which may not exist unless
+    #specifically created.
+    #At that point add a testcase for the pid file management.
+    try: 
+      f = open(pidfile,'w')
+      f.close()
+    except:
+      logger.warn("Couldn't write pid file %s for %s" % (pidfile,self.cmd))
+    changeUid()
+    os.setsid() 

Modified: incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py Sat Dec 31 17:06:27 2011
@@ -19,10 +19,13 @@ limitations under the License.
 '''
 
 from unittest import TestCase
-import os, errno
+import os, errno, getpass
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.FileUtil import getFilePath
+from ambari_agent import shell
+from ambari_agent.shell import serverTracker
+import time
 
 class TestAgentActions(TestCase):
   def test_installAndConfigAction(self):
@@ -57,3 +60,43 @@ class TestAgentActions(TestCase):
     cmdResult = result['commandResult']
     self.assertEqual(cmdResult['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % cmdResult['exitCode'])
     self.assertEqual(cmdResult['output'], path + "\n", "installAndConfigAction test failed Returned %s " % cmdResult['output'])
+
+  def test_startAndStopAction(self):
+    command = {'script' : 'import os,sys,time\ni = 0\nwhile (i < 1000):\n  print "testhello"\n  sys.stdout.flush()\n  time.sleep(1)\n  i+=1',
+               'param' : ''}
+    action={'id' : 'ttt',
+            'kind' : 'START_ACTION',
+            'clusterId' : 'foobar',
+            'clusterDefinitionRevision' : 1,
+            'component' : 'foocomponent',
+            'role' : 'foorole',
+            'command' : command,
+            'user' : getpass.getuser()
+    }
+    
+    actionQueue = ActionQueue(AmbariConfig().getConfig())
+    result = actionQueue.startAction(action)
+    cmdResult = result['commandResult']
+    self.assertEqual(cmdResult['exitCode'], 0, "starting a process failed")
+    shell = actionQueue.getshellinstance()
+    key = shell.getServerKey(action['clusterId'],action['clusterDefinitionRevision'],
+                       action['component'],action['role'])
+    keyPresent = True
+    if not key in serverTracker:
+      keyPresent = False
+    self.assertEqual(keyPresent, True, "Key not present")
+    plauncher = serverTracker[key]
+    self.assertTrue(plauncher.getpid() > 0, "Pid less than 0!")
+    time.sleep(5)
+    shell.stopProcess(key)
+    keyPresent = False
+    if key in serverTracker:
+      keyPresent = True
+    self.assertEqual(keyPresent, False, "Key present")
+    processexists = True
+    try:
+      os.kill(serverTracker[key].getpid(),0)
+    except:
+      processexists = False
+    self.assertEqual(processexists, False, "Process still exists!")
+    self.assertTrue("testhello" in plauncher.out, "Output doesn't match!")