You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/20 23:51:56 UTC
svn commit: r1221512 - in
/incubator/ambari/trunk/agent/src/main/python/ambari_agent: ActionQueue.py
main.py shell.py
Author: ddas
Date: Tue Dec 20 22:51:56 2011
New Revision: 1221512
URL: http://svn.apache.org/viewvc?rev=1221512&view=rev
Log:
AMBARI-157. Fixed some process creation related issues.
Modified:
incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1221512&r1=1221511&r2=1221512&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Tue Dec 20 22:51:56 2011
@@ -162,7 +162,7 @@ class ActionQueue(threading.Thread):
w = self.writeFileAction(action,self.getInstallFilename(action['id']))
commandResult = {}
if w['exitCode']!=0:
- commandResult['error'] = w['error']
+ commandResult['error'] = w['stderr']
commandResult['exitCode'] = w['exitCode']
r['commandResult'] = commandResult
return r
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py?rev=1221512&r1=1221511&r2=1221512&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py Tue Dec 20 22:51:56 2011
@@ -28,6 +28,7 @@ import time
import ConfigParser
from createDaemon import createDaemon
from Controller import Controller
+from shell import getTempFiles
import AmbariConfig
logger = logging.getLogger()
@@ -48,6 +49,15 @@ def signal_handler(signum, frame):
os.unlink(pidfile)
except Exception:
logger.warn("Unable to remove: "+pidfile)
+ traceback.print_exc()
+
+ tempFiles = getTempFiles()
+ for tempFile in tempFiles:
+ try:
+ os.unlink(tempFile)
+ except Exception:
+ traceback.print_exc()
+ logger.warn("Unable to remove: "+tempFile)
os._exit(0)
def debug(sig, frame):
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py?rev=1221512&r1=1221511&r2=1221512&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py Tue Dec 20 22:51:56 2011
@@ -28,32 +28,38 @@ import os
import tempfile
import signal
import sys
+import threading
global serverTracker
serverTracker = {}
logger = logging.getLogger()
+threadLocal = threading.local()
+
+tempFiles = []
+def noteTempFile(filename):
+ tempFiles.append(filename)
+
+def getTempFiles():
+ return tempFiles
+
class shellRunner:
# Run any command
def run(self, script, user=None):
- oldUid = os.getuid()
try:
if user!=None:
user=getpwnam(user)[2]
- os.setuid(user)
+ else:
+ user = os.getuid()
+ threadLocal.uid = user
except Exception:
logger.warn("can not switch user for RUN_COMMAND.")
code = 0
cmd = " "
cmd = cmd.join(script)
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
out, err = p.communicate()
code = p.wait()
- try:
- if user!=None:
- os.setuid(oldUid)
- except Exception:
- logger.warn("can not restore user for RUN_COMMAND.")
return {'exitCode': code, 'output': out, 'error': err}
# dispatch action types
@@ -64,7 +70,9 @@ class shellRunner:
try:
if user is not None:
user=getpwnam(user)[2]
- os.setuid(user)
+ else:
+ user = oldUid
+ threadLocal.uid = user
except Exception:
logger.warn("%s %s %s can not switch user for RUN_ACTION." % (clusterId, component, role))
code = 0
@@ -75,7 +83,7 @@ class shellRunner:
tmp.close()
cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param']))
commandResult = {}
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
out, err = p.communicate()
code = p.wait()
if code != 0:
@@ -104,7 +112,6 @@ class shellRunner:
os.unlink(tempfilename)
try:
os.chdir(oldDir)
- os.setuid(oldUid)
except Exception:
logger.warn("%s %s %s can not restore environment for RUN_ACTION." % (clusterId, component, role))
return result
@@ -116,34 +123,43 @@ class shellRunner:
os.chdir(self.getWorkDir(clusterId,role))
oldUid = os.getuid()
try:
- user=getpwnam(user)[2]
- os.setuid(user)
+ if user is not None:
+ user=getpwnam(user)[2]
+ else:
+ user = os.getuid()
+ threadLocal.uid = user
except Exception:
logger.warn("%s %s %s can not switch user for START_ACTION." % (clusterId, component, role))
code = 0
commandResult = {}
process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
if not process in serverTracker:
- cmd = sys.executable
tempfilename = tempfile.mktemp()
- tmp = open(tempfilename, 'w')
- tmp.write(script['script'])
- tmp.close()
- cmd = "%s %s %s" % (cmd, tempfilename, " ".join(script['param']))
+ noteTempFile(tempfilename)
child_pid = os.fork()
if child_pid == 0:
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
- out, err = p.communicate()
- code = p.wait()
- os.unlink(tempfilename)
- serverTracker[process] = None
+ try:
+ signal.signal(signal.SIGINT, SIG_DFL)
+ signal.signal(signal.SIGTERM, SIG_DFL)
+ self.changeUid()
+ cmd = sys.executable
+ tmp = open(tempfilename, 'w')
+ tmp.write(script['script'])
+ tmp.close()
+ cmd = "%s %s %s" % (cmd, tempfilename, " ".join(script['param']))
+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ out, err = p.communicate()
+ code = p.wait()
+ os.unlink(tempfilename)
+ os._exit(1)
+ except:
+ os._exit(1)
else:
serverTracker[process] = child_pid
commandResult['exitCode'] = 0
result['commandResult'] = commandResult
try:
os.chdir(oldDir)
- os.setuid(oldUid)
except Exception:
logger.warn("%s %s %s can not restore environment for START_ACTION." % (clusterId, component, role))
return result
@@ -166,3 +182,9 @@ class shellRunner:
def getWorkDir(self, clusterId, role):
prefix = AmbariConfig.config.get('stack','installprefix')
return str(os.path.join(prefix, clusterId, role))
+
+ def changeUid(self):
+ try:
+ os.setuid(threadLocal.uid)
+ except Exception:
+ logger.warn("can not switch user for running command.")