You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/31 18:06:27 UTC
svn commit: r1226136 - in /incubator/ambari/trunk: ./
agent/src/main/python/ambari_agent/ agent/src/test/python/
Author: ddas
Date: Sat Dec 31 17:06:27 2011
New Revision: 1226136
URL: http://svn.apache.org/viewvc?rev=1226136&view=rev
Log:
AMBARI-180. Fixes the agent to do better process management.
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Sat Dec 31 17:06:27 2011
@@ -2,7 +2,10 @@ Ambari Change log
Release 0.1.0 - unreleased
- AMBARI-179. Set the component level user/group information in the flattened stack, inherit default user/group information if not set one for component. (vgogate)
+ AMBARI-180. Fixes the agent to do better process management (ddas)
+
+ AMBARI-179. Set the component level user/group information in the flattened stack,
+ inherit default user/group information if not set one for component. (vgogate)
AMBARI-178. Add support for Map/Reduce component in Ambari stack (vgogate)
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Sat Dec 31 17:06:27 2011
@@ -49,8 +49,8 @@ class ActionQueue(threading.Thread):
self.config = config
self.sh = shellRunner()
self._stop = threading.Event()
- self.maxRetries = config.get('command', 'maxretries')
- self.sleepInterval = config.get('command', 'sleepBetweenRetries')
+ self.maxRetries = config.getint('command', 'maxretries')
+ self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
def stop(self):
self._stop.set()
@@ -58,6 +58,10 @@ class ActionQueue(threading.Thread):
def stopped(self):
return self._stop.isSet()
+ #For unittest
+ def getshellinstance(self):
+ return self.sh
+
def put(self, response):
if 'actions' in response:
actions = response['actions']
@@ -201,7 +205,7 @@ class ActionQueue(threading.Thread):
# this is hardcoded to do puppet specific stuff for now
# append the content of the puppet file to the file written above
filepath = getFilePath(action,self.getInstallFilename(action['id']))
- logger.debug("FILEPATH : " + filepath)
+ logger.info("File path for puppet top level script: " + filepath)
p = self.sh.run(['/bin/cat',AmbariConfig.config.get('puppet','driver')])
if p['exitCode']!=0:
commandResult['error'] = p['error']
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py Sat Dec 31 17:06:27 2011
@@ -77,7 +77,7 @@ def writeFile(action, result, fileName="
fullPathName=os.path.join(path, fileName)
else:
fullPathName=os.path.join(path, fileInfo['path'])
- logger.info("path in writeFile: %s" % fullPathName)
+ logger.debug("path in writeFile: %s" % fullPathName)
content=fileInfo['data']
try:
if isinstance(user, int)!=True:
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py Sat Dec 31 17:06:27 2011
@@ -29,6 +29,7 @@ import ConfigParser
from createDaemon import createDaemon
from Controller import Controller
from shell import getTempFiles
+from shell import killstaleprocesses
import AmbariConfig
logger = logging.getLogger()
@@ -58,11 +59,12 @@ def signal_handler(signum, frame):
tempFiles = getTempFiles()
for tempFile in tempFiles:
- try:
- os.unlink(tempFile)
- except Exception:
- traceback.print_exc()
- logger.warn("Unable to remove: "+tempFile)
+ if os.path.exists(tempFile):
+ try:
+ os.unlink(tempFile)
+ except Exception:
+ traceback.print_exc()
+ logger.warn("Unable to remove: "+tempFile)
os._exit(0)
def debug(sig, frame):
@@ -109,7 +111,8 @@ def main():
pid = str(os.getpid())
file(pidfile, 'w').write(pid)
- logger.setLevel(logging.DEBUG)
+
+ logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 10)
rotateLog.setFormatter(formatter)
@@ -127,6 +130,7 @@ def main():
except Exception, err:
logger.warn(err)
+ killstaleprocesses()
logger.info("Connecting to controller at: "+config.get('controller', 'url'))
# Launch Controller communication
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py Sat Dec 31 17:06:27 2011
@@ -29,6 +29,9 @@ import tempfile
import signal
import sys
import threading
+import time
+import traceback
+import shutil
global serverTracker
serverTracker = {}
@@ -43,6 +46,34 @@ def noteTempFile(filename):
def getTempFiles():
return tempFiles
+def killstaleprocesses():
+ logger.info ("Killing stale processes")
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ files = os.listdir(prefix)
+ for file in files:
+ if str(file).endswith(".pid"):
+ pid = str(file).split('.')[0]
+ killprocessgrp(int(pid))
+ os.unlink(os.path.join(prefix,file))
+ logger.info ("Killed stale processes")
+
+def killprocessgrp(pid):
+ try:
+ os.killpg(pid, signal.SIGTERM)
+ time.sleep(5)
+ try:
+ os.killpg(pid, signal.SIGKILL)
+ except:
+ logger.warn("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
+ except:
+ logger.warn("Failed to kill PID %d" % (pid))
+
+def changeUid():
+ try:
+ os.setuid(threadLocal.uid)
+ except Exception:
+ logger.warn("can not switch user for running command.")
+
class shellRunner:
# Run any command
def run(self, script, user=None):
@@ -57,7 +88,7 @@ class shellRunner:
code = 0
cmd = " "
cmd = cmd.join(script)
- p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
out, err = p.communicate()
code = p.wait()
return {'exitCode': code, 'output': out, 'error': err}
@@ -65,6 +96,7 @@ class shellRunner:
# dispatch action types
def runAction(self, clusterId, component, role, user, command, cleanUpCommand, result):
oldDir = os.getcwd()
+ #TODO: handle this better. Don't like that it is doing a chdir for the main process
os.chdir(self.getWorkDir(clusterId, role))
oldUid = os.getuid()
try:
@@ -83,7 +115,7 @@ class shellRunner:
tmp.close()
cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param']))
commandResult = {}
- p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
out, err = p.communicate()
code = p.wait()
if code != 0:
@@ -110,6 +142,7 @@ class shellRunner:
cleanUpResult['exitCode'] = cleanUpCode
result['cleanUpResult'] = cleanUpResult
os.unlink(tempfilename)
+ os._exit(1)
try:
os.chdir(oldDir)
except Exception:
@@ -120,7 +153,10 @@ class shellRunner:
def startProcess(self, clusterId, clusterDefinitionRevision, component, role, script, user, result):
global serverTracker
oldDir = os.getcwd()
- os.chdir(self.getWorkDir(clusterId,role))
+ try:
+ os.chdir(self.getWorkDir(clusterId,role))
+ except Exception:
+ logger.warn("%s %s %s can not switch dir for START_ACTION." % (clusterId, component, role))
oldUid = os.getuid()
try:
if user is not None:
@@ -134,27 +170,16 @@ class shellRunner:
commandResult = {}
process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
if not process in serverTracker:
- tempfilename = tempfile.mktemp()
- noteTempFile(tempfilename)
- child_pid = os.fork()
- if child_pid == 0:
- try:
- self.changeUid()
- cmd = sys.executable
- tmp = open(tempfilename, 'w')
- tmp.write(script['script'])
- tmp.close()
- cmd = "%s %s %s" % (cmd, tempfilename, " ".join(script['param']))
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
- out, err = p.communicate()
- code = p.wait()
- os.unlink(tempfilename)
- os._exit(1)
- except:
- os._exit(1)
- else:
- serverTracker[process] = child_pid
- commandResult['exitCode'] = 0
+ try:
+ plauncher = processlauncher(script,user)
+ plauncher.start()
+ plauncher.blockUntilProcessCreation()
+ except Exception:
+ traceback.print_exc()
+ logger.warn("Can not launch process for %s %s %s" % (clusterId, component, role))
+ code = -1
+ serverTracker[process] = plauncher
+ commandResult['exitCode'] = code
result['commandResult'] = commandResult
try:
os.chdir(oldDir)
@@ -168,7 +193,8 @@ class shellRunner:
keyFragments = processKey.split('/')
process = self.getServerKey(keyFragments[0],keyFragments[1],keyFragments[2],keyFragments[3])
if process in serverTracker:
- os.kill(serverTracker[process], signal.SIGKILL)
+ logger.info ("Sending %s with PID %d the SIGTERM signal" % (process,serverTracker[process].getpid()))
+ killprocessgrp(serverTracker[process].getpid())
del serverTracker[process]
def getServerTracker(self):
@@ -181,8 +207,72 @@ class shellRunner:
prefix = AmbariConfig.config.get('stack','installprefix')
return str(os.path.join(prefix, clusterId, role))
- def changeUid(self):
+
+class processlauncher(threading.Thread):
+ def __init__(self,script,uid):
+ threading.Thread.__init__(self)
+ self.script = script
+ self.serverpid = -1
+ self.uid = uid
+ self.out = None
+ self.err = None
+
+ def run(self):
try:
- os.setuid(threadLocal.uid)
- except Exception:
- logger.warn("can not switch user for running command.")
+ tempfilename = tempfile.mktemp()
+ noteTempFile(tempfilename)
+ pythoncmd = sys.executable
+ tmp = open(tempfilename, 'w')
+ tmp.write(self.script['script'])
+ tmp.close()
+ threadLocal.uid = self.uid
+ self.cmd = "%s %s %s" % (pythoncmd, tempfilename, " ".join(self.script['param']))
+ logger.info("Launching %s as uid %d" % (self.cmd,self.uid) )
+ p = subprocess.Popen(self.cmd, preexec_fn=self.changeUidAndSetSid, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, shell=True, close_fds=True)
+ logger.info("Launched %s; PID %d" % (self.cmd,p.pid))
+ self.serverpid = p.pid
+ self.out, self.err = p.communicate()
+ self.code = p.wait()
+ logger.info("%s; PID %d exited with code %d \nSTDOUT: %s\nSTDERR %s" %
+ (self.cmd,p.pid,self.code,self.out,self.err))
+ except:
+ logger.warn("Exception encountered while launching : " + self.cmd)
+ traceback.print_exc()
+
+ os.unlink(self.getpidfile())
+ os.unlink(tempfilename)
+
+ def blockUntilProcessCreation(self):
+ self.getpid()
+
+ def getpid(self):
+ sleepCount = 1
+ while (self.serverpid == -1):
+ time.sleep(1)
+ logger.info("Waiting for process %s to start" % self.cmd)
+ if sleepCount > 10:
+ logger.warn("Couldn't start process %s even after %d seconds" % (self.cmd,sleepCount))
+ os._exit(1)
+ return self.serverpid
+
+ def getpidfile(self):
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ pidfile = os.path.join(prefix,str(self.getpid())+".pid")
+ return pidfile
+
+ def changeUidAndSetSid(self):
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ pidfile = os.path.join(prefix,str(os.getpid())+".pid")
+ #TODO remove try/except (when there is a way to provide
+ #config files for testcases). The default config will want
+ #to create files in /var/ambari which may not exist unless
+ #specifically created.
+ #At that point add a testcase for the pid file management.
+ try:
+ f = open(pidfile,'w')
+ f.close()
+ except:
+ logger.warn("Couldn't write pid file %s for %s" % (pidfile,self.cmd))
+ changeUid()
+ os.setsid()
Modified: incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py?rev=1226136&r1=1226135&r2=1226136&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py Sat Dec 31 17:06:27 2011
@@ -19,10 +19,13 @@ limitations under the License.
'''
from unittest import TestCase
-import os, errno
+import os, errno, getpass
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.AmbariConfig import AmbariConfig
from ambari_agent.FileUtil import getFilePath
+from ambari_agent import shell
+from ambari_agent.shell import serverTracker
+import time
class TestAgentActions(TestCase):
def test_installAndConfigAction(self):
@@ -57,3 +60,43 @@ class TestAgentActions(TestCase):
cmdResult = result['commandResult']
self.assertEqual(cmdResult['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % cmdResult['exitCode'])
self.assertEqual(cmdResult['output'], path + "\n", "installAndConfigAction test failed Returned %s " % cmdResult['output'])
+
+ def test_startAndStopAction(self):
+ command = {'script' : 'import os,sys,time\ni = 0\nwhile (i < 1000):\n print "testhello"\n sys.stdout.flush()\n time.sleep(1)\n i+=1',
+ 'param' : ''}
+ action={'id' : 'ttt',
+ 'kind' : 'START_ACTION',
+ 'clusterId' : 'foobar',
+ 'clusterDefinitionRevision' : 1,
+ 'component' : 'foocomponent',
+ 'role' : 'foorole',
+ 'command' : command,
+ 'user' : getpass.getuser()
+ }
+
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ result = actionQueue.startAction(action)
+ cmdResult = result['commandResult']
+ self.assertEqual(cmdResult['exitCode'], 0, "starting a process failed")
+ shell = actionQueue.getshellinstance()
+ key = shell.getServerKey(action['clusterId'],action['clusterDefinitionRevision'],
+ action['component'],action['role'])
+ keyPresent = True
+ if not key in serverTracker:
+ keyPresent = False
+ self.assertEqual(keyPresent, True, "Key not present")
+ plauncher = serverTracker[key]
+ self.assertTrue(plauncher.getpid() > 0, "Pid less than 0!")
+ time.sleep(5)
+ shell.stopProcess(key)
+ keyPresent = False
+ if key in serverTracker:
+ keyPresent = True
+ self.assertEqual(keyPresent, False, "Key present")
+ processexists = True
+ try:
+ os.kill(serverTracker[key].getpid(),0)
+ except:
+ processexists = False
+ self.assertEqual(processexists, False, "Process still exists!")
+ self.assertTrue("testhello" in plauncher.out, "Output doesn't match!")