You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/20 23:51:56 UTC

svn commit: r1221512 - in /incubator/ambari/trunk/agent/src/main/python/ambari_agent: ActionQueue.py main.py shell.py

Author: ddas
Date: Tue Dec 20 22:51:56 2011
New Revision: 1221512

URL: http://svn.apache.org/viewvc?rev=1221512&view=rev
Log:
AMBARI-157. Fixed some process creation related issues.

Modified:
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1221512&r1=1221511&r2=1221512&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Tue Dec 20 22:51:56 2011
@@ -162,7 +162,7 @@ class ActionQueue(threading.Thread):
     w = self.writeFileAction(action,self.getInstallFilename(action['id']))
     commandResult = {}
     if w['exitCode']!=0:
-      commandResult['error'] = w['error'] 
+      commandResult['error'] = w['stderr'] 
       commandResult['exitCode'] = w['exitCode']
       r['commandResult'] = commandResult
       return r

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py?rev=1221512&r1=1221511&r2=1221512&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/main.py Tue Dec 20 22:51:56 2011
@@ -28,6 +28,7 @@ import time
 import ConfigParser
 from createDaemon import createDaemon
 from Controller import Controller
+from shell import getTempFiles
 import AmbariConfig
 
 logger = logging.getLogger()
@@ -48,6 +49,15 @@ def signal_handler(signum, frame):
     os.unlink(pidfile)
   except Exception:
     logger.warn("Unable to remove: "+pidfile)
+    traceback.print_exc()
+
+  tempFiles = getTempFiles()
+  for tempFile in tempFiles:
+    try:
+      os.unlink(tempFile)
+    except Exception:
+      traceback.print_exc()
+      logger.warn("Unable to remove: "+tempFile)
   os._exit(0)
 
 def debug(sig, frame):

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py?rev=1221512&r1=1221511&r2=1221512&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/shell.py Tue Dec 20 22:51:56 2011
@@ -28,32 +28,38 @@ import os
 import tempfile
 import signal
 import sys
+import threading
 
 global serverTracker
 serverTracker = {}
 logger = logging.getLogger()
 
+threadLocal = threading.local()
+
+tempFiles = [] 
+def noteTempFile(filename):
+  tempFiles.append(filename)
+
+def getTempFiles():
+  return tempFiles
+
 class shellRunner:
   # Run any command
   def run(self, script, user=None):
-    oldUid = os.getuid()
     try:
       if user!=None:
         user=getpwnam(user)[2]
-        os.setuid(user)
+      else:
+        user = os.getuid()
+      threadLocal.uid = user
     except Exception:
       logger.warn("can not switch user for RUN_COMMAND.")
     code = 0
     cmd = " "
     cmd = cmd.join(script)
-    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+    p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
     out, err = p.communicate()
     code = p.wait()
-    try:
-      if user!=None:
-        os.setuid(oldUid)
-    except Exception:
-      logger.warn("can not restore user for RUN_COMMAND.")
     return {'exitCode': code, 'output': out, 'error': err}
 
   # dispatch action types
@@ -64,7 +70,9 @@ class shellRunner:
     try:
       if user is not None:
         user=getpwnam(user)[2]
-        os.setuid(user)
+      else:
+        user = oldUid
+      threadLocal.uid = user
     except Exception:
       logger.warn("%s %s %s can not switch user for RUN_ACTION." % (clusterId, component, role))
     code = 0
@@ -75,7 +83,7 @@ class shellRunner:
     tmp.close()
     cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param']))
     commandResult = {}
-    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+    p = subprocess.Popen(cmd, preexec_fn=self.changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
     out, err = p.communicate()
     code = p.wait()
     if code != 0:
@@ -104,7 +112,6 @@ class shellRunner:
       os.unlink(tempfilename)
     try:
       os.chdir(oldDir)
-      os.setuid(oldUid)
     except Exception:
       logger.warn("%s %s %s can not restore environment for RUN_ACTION." % (clusterId, component, role))
     return result
@@ -116,34 +123,43 @@ class shellRunner:
     os.chdir(self.getWorkDir(clusterId,role))
     oldUid = os.getuid()
     try:
-      user=getpwnam(user)[2]
-      os.setuid(user)
+      if user is not None:
+        user=getpwnam(user)[2]
+      else:
+        user = os.getuid()
+      threadLocal.uid = user
     except Exception:
       logger.warn("%s %s %s can not switch user for START_ACTION." % (clusterId, component, role))
     code = 0
     commandResult = {}
     process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
     if not process in serverTracker:
-      cmd = sys.executable
       tempfilename = tempfile.mktemp()
-      tmp = open(tempfilename, 'w')
-      tmp.write(script['script'])
-      tmp.close()
-      cmd = "%s %s %s" % (cmd, tempfilename, " ".join(script['param']))
+      noteTempFile(tempfilename)  
       child_pid = os.fork()
       if child_pid == 0:
-        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
-        out, err = p.communicate()
-        code = p.wait()
-        os.unlink(tempfilename)
-        serverTracker[process] = None
+        try:
+          signal.signal(signal.SIGINT, SIG_DFL)
+          signal.signal(signal.SIGTERM, SIG_DFL)
+          self.changeUid() 
+          cmd = sys.executable
+          tmp = open(tempfilename, 'w')
+          tmp.write(script['script'])
+          tmp.close()
+          cmd = "%s %s %s" % (cmd, tempfilename, " ".join(script['param']))
+          p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+          out, err = p.communicate()
+          code = p.wait()
+          os.unlink(tempfilename)
+          os._exit(1)
+        except:
+          os._exit(1)
       else:
         serverTracker[process] = child_pid
         commandResult['exitCode'] = 0
       result['commandResult'] = commandResult
     try:
       os.chdir(oldDir)
-      os.setuid(oldUid)
     except Exception:
       logger.warn("%s %s %s can not restore environment for START_ACTION." % (clusterId, component, role))
     return result
@@ -166,3 +182,9 @@ class shellRunner:
   def getWorkDir(self, clusterId, role):
     prefix = AmbariConfig.config.get('stack','installprefix')
     return str(os.path.join(prefix, clusterId, role))
+
+  def changeUid(self):
+    try:
+      os.setuid(threadLocal.uid)
+    except Exception:
+      logger.warn("can not switch user for running command.")