You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ni...@apache.org on 2008/01/28 16:58:15 UTC

svn commit: r615919 [2/2] - in /hadoop/core/trunk: ./ src/contrib/hod/ src/contrib/hod/bin/ src/contrib/hod/conf/ src/contrib/hod/hodlib/Common/ src/contrib/hod/hodlib/GridServices/ src/contrib/hod/hodlib/Hod/ src/contrib/hod/hodlib/HodRing/ src/contri...

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py Mon Jan 28 07:58:08 2008
@@ -19,13 +19,14 @@
 """
 # -*- python -*-
 import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom
-import socket, sets, urllib, csv, signal, pprint, random, re
+import socket, sets, urllib, csv, signal, pprint, random, re, httplib
 
 from xml.dom import getDOMImplementation
 from pprint import pformat
 from optparse import OptionParser
 from urlparse import urlparse
-from hodlib.Common.util import local_fqdn
+from hodlib.Common.util import local_fqdn, parseEquals
+from hodlib.Common.tcp import tcpSocket, tcpError 
 
 binfile = sys.path[0]
 libdir = os.path.dirname(binfile)
@@ -53,6 +54,7 @@
     self.log.debug("In command desc")
     self.log.debug("Done in command desc")
     dict.setdefault('argv', [])
+    dict.setdefault('version', None)
     dict.setdefault('envs', {})
     dict.setdefault('java-opts', [])
     dict.setdefault('workdirs', [])
@@ -83,6 +85,9 @@
   def getArgv(self):
     return self.dict['argv']
 
+  def getVersion(self):
+    return self.dict['version']
+
   def getEnvs(self):
     return self.dict['envs']
 
@@ -243,9 +248,13 @@
     topElement = doc.documentElement
     topElement.appendChild(comment)
     
-    attr = self.desc.getfinalAttrs()
-    self.createXML(doc, attr, topElement, True)
-    attr = self.desc.getAttrs()
+    finalAttr = self.desc.getfinalAttrs()
+    self.createXML(doc, finalAttr, topElement, True)
+    attr = {}
+    attr1 = self.desc.getAttrs()
+    for k,v in attr1.iteritems():
+      if not finalAttr.has_key(k):
+        attr[k] = v
     self.createXML(doc, attr, topElement, False)
               
     
@@ -306,7 +315,7 @@
     fenvs = os.environ
     
     for k, v in envs.iteritems():
-      fenvs[k] = v[0]
+      fenvs[k] = v
     
     self.log.debug(javaOpts)
     fenvs['HADOOP_OPTS'] = ''
@@ -440,6 +449,15 @@
     self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
     return hadoopPackage
 
+  def getRunningValues(self):
+    return self.__running.values()
+
+  def getTempDir(self):
+    return self.__tempDir
+
+  def getHadoopLogDirs(self):
+    return self.__hadoopLogDirs
+ 
   def __download_package(self, ringClient):
     self.log.debug("Found download address: %s" % 
                    self._cfg['download-addr'])
@@ -523,6 +541,75 @@
         continue
       self.__running[id-1] = cmd
 
+      # ok.. now command is running. If this HodRing got jobtracker, 
+      # Check if it is ready for accepting jobs, and then only return
+      self.__check_jobtracker(desc, id-1)
+      
+  def __check_jobtracker(self, desc, id):
+    # Check jobtracker status. Return properly if it is ready to accept jobs.
+    # Currently Checks for Jetty to come up, the last thing that can be checked
+    # before JT completes initialisation. To be perfectly reliable, we need 
+    # hadoop support
+    name = desc.getName()
+    if name == 'jobtracker':
+      # Yes I am the Jobtracker
+      self.log.debug("Waiting for jobtracker to initialise")
+      version = desc.getVersion()
+      self.log.debug("jobtracker version : %s" % version)
+      attrs = self.getRunningValues()[id].getFilledInKeyValues()
+      attrs = parseEquals(attrs)
+      jobTrackerAddr = attrs['mapred.job.tracker']
+      self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr)
+      if version < 16:
+        jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \
+                              attrs['mapred.job.tracker.info.port']
+      else:
+        jettyAddr = attrs['mapred.job.tracker.http.bindAddress']
+      self.log.debug("Jobtracker jetty : %s" % jettyAddr)
+
+      # Check for Jetty to come up
+      # For this do a http head, and then look at the status
+      defaultTimeout = socket.getdefaulttimeout()
+      # socket timeout isn`t exposed at httplib level. Setting explicitly.
+      socket.setdefaulttimeout(1)
+      sleepTime = 0.5
+      jettyStatus = False
+      jettyStatusmsg = ""
+      while sleepTime <= 32:
+        try:
+          jettyConn = httplib.HTTPConnection(jettyAddr)
+          jettyConn.request("HEAD", "/jobtracker.jsp")
+          # httplib inherently retries the following till socket timeout
+          resp = jettyConn.getresponse()
+          if resp.status != 200:
+            # Some problem?
+            jettyStatus = False
+            jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\
+                             " request. HTTP Status (Code, Msg): (%s, %s)" % \
+                             ( resp.status, resp.reason )
+            break
+          else:
+            self.log.info("Jetty returned a 200 status (%s)" % resp.reason)
+            self.log.info("JobTracker successfully initialised")
+            return
+        except socket.error:
+          self.log.debug("Jetty gave a socket error. Sleeping for %s" \
+                                                                  % sleepTime)
+          time.sleep(sleepTime)
+          sleepTime = sleepTime * 2
+        except Exception, e:
+          jettyStatus = False
+          jettyStatusmsg = ("Process(possibly other than jetty) running on" + \
+                  " port assigned to jetty is returning invalid http response")
+          break
+      socket.setdefaulttimeout(defaultTimeout)
+      if not jettyStatus:
+        self.log.critical("Jobtracker failed to initialise.")
+        if jettyStatusmsg:
+          self.log.critical( "Reason: %s" % jettyStatusmsg )
+        else: self.log.critical( "Reason: Jetty failed to give response")
+        raise Exception("JobTracker failed to initialise")
+
   def stop(self):
     self.log.debug("Entered hodring stop.")
     if self._http: 
@@ -532,153 +619,12 @@
     self.log.debug("call hodsvcrgy stop...")
     hodBaseService.stop(self)
     
-    self.clean_up()
-    
-  def clean_up(self):
-    os.chdir(originalcwd)
-    if not mswindows:
-      # do the UNIX double-fork magic, see Stevens' "Advanced 
-      # Programming in the UNIX Environment" for details (ISBN 0201563177)
-      try: 
-        pid = os.fork() 
-        if pid > 0:
-          # exit first parent
-          sys.exit(0) 
-      except OSError, e: 
-        self.log.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror)) 
-        sys.exit(1)
-
-      # decouple from parent environment
-      os.chdir("/") 
-      os.setsid() 
-      os.umask(0) 
-
-      # do second fork
-      try: 
-        pid = os.fork() 
-        if pid > 0:
-          # exit from second parent, print eventual PID before
-          sys.exit(0) 
-      except OSError, e: 
-        self.log.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror))
-        sys.exit(1) 
-
-    try:
-#      for cmd in self.__running.values():
-#        self.log.debug("killing %s..." % cmd)
-#        cmd.kill()
-  
-      list = []
-      
-      for cmd in self.__running.values():
-        self.log.debug("addding %s to cleanup list..." % cmd)
-        cmd.addCleanup(list)
-      
-      list.append(self.__tempDir)
-         
-      self.__archive_logs()   
-          
-      for dir in list:
-        if os.path.exists(dir):
-          self.log.debug('removing %s' % (dir))
-          shutil.rmtree(dir, True)
-    except:
-      self.log.error(get_exception_string())
-    sys.exit(0)
-
   def _xr_method_clusterStart(self, initialize=True):
     return self.clusterStart(initialize)
 
   def _xr_method_clusterStop(self):
     return self.clusterStop()
  
-  def __copy_archive_to_dfs(self, archiveFile):        
-    hdfsURIMatch = reHdfsURI.match(self._cfg['log-destination-uri'])
-    
-    # FIXME this is a complete and utter hack. Currently hadoop is broken
-    # and does not understand hdfs:// syntax on the command line :(
-    
-    pid = os.getpid()
-    tempConfDir = '/tmp/%s' % pid
-    os.mkdir(tempConfDir)
-    tempConfFileName = '%s/hadoop-site.xml' % tempConfDir
-    tempHadoopConfig = open(tempConfFileName, 'w')
-    print >>tempHadoopConfig, "<configuration>"
-    print >>tempHadoopConfig, "  <property>"
-    print >>tempHadoopConfig, "    <name>fs.default.name</name>"
-    print >>tempHadoopConfig, "    <value>%s</value>" % hdfsURIMatch.group(1)
-    print >>tempHadoopConfig, "    <description>No description</description>"
-    print >>tempHadoopConfig, "  </property>"
-    print >>tempHadoopConfig, "</configuration>"
-    tempHadoopConfig.close()
-    
-    # END LAME HACK
-    
-    (head, tail) = os.path.split(archiveFile)
-    destFile = os.path.join(hdfsURIMatch.group(2), self._cfg['userid'], 
-                            self._cfg['service-id'], tail)
-    
-    self.log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
-    
-    runningHadoops = self.__running.values()
-    if (len(runningHadoops) == 0):
-      self.log.info("len(runningHadoops) == 0, No running cluster?")
-      self.log.info("Skipping __copy_archive_to_dfs")
-      return
- 
-    run = runningHadoops[0]
-    hadoopCmd = run.path
-    if self._cfg.has_key('pkgs'):
-      hadoopCmd = os.path.join(self._cfg['pkgs'], 'bin', 'hadoop')
-
-    # LAME HACK AGAIN, using config generated above :( 
-    copyCommand = "%s --config %s dfs -copyFromLocal %s %s" % (hadoopCmd, 
-      tempConfDir, archiveFile, destFile)
-    
-    self.log.debug(copyCommand)
-    
-    copyThread = simpleCommand('hadoop', copyCommand)
-    copyThread.start()
-    copyThread.wait()
-    copyThread.join()
-    self.log.debug(pprint.pformat(copyThread.output()))
-    
-    # LAME HACK AGAIN, deleting config generated above :( 
-    os.unlink(tempConfFileName)
-    os.rmdir(tempConfDir)
-    os.unlink(archiveFile)
-  
-  def __archive_logs(self):
-    status = True
-    if self._cfg.has_key("log-destination-uri"):
-      try:
-        if self.__hadoopLogDirs:
-          date = time.localtime()
-          for logDir in self.__hadoopLogDirs:
-            (head, tail) = os.path.split(logDir)
-            (head, logType) = os.path.split(head)
-            tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
-              logType, local_fqdn(), date[0], date[1], date[2], date[3], 
-              date[4], date[5], random.randint(0,1000))
-            
-            if self._cfg["log-destination-uri"].startswith('file://'):
-              tarBallFile = os.path.join(self._cfg["log-destination-uri"][7:], 
-                                         tarBallFile)
-            else:
-              tarBallFile = os.path.join(self._cfg['temp-dir'], tarBallFile)
-            
-            self.log.info('archiving log files to: %s' % tarBallFile)
-            status = tar(tarBallFile, logDir, ['*',])
-            self.log.info('archive %s status: %s' % (tarBallFile, status))
-            if status and \
-              self._cfg["log-destination-uri"].startswith('hdfs://'):
-              self.__copy_archive_to_dfs(tarBallFile)
-          dict = {} 
-      except:
-        self.log.error(get_exception_string())
-      
-    return status
-      
   def start(self):
     """Run and maintain hodring commands"""
     

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py Mon Jan 28 07:58:08 2008
@@ -150,7 +150,8 @@
         break
       
       argList.extend(process_qsub_attributes())
-      argList.extend(('-N', 'HOD'))
+
+      argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))
       argList.extend(('-r','n'))
 
       if 'pbs-user' in self._cfg['resource_manager']:
@@ -161,9 +162,11 @@
         queue = self._cfg['resource_manager']['queue']
         argList.extend(('-q',queue))
   
-      # accounting should recognize userid:pbs-account as being "charged"
-      argList.extend(('-A', (self._cfg['hod']['userid'] + ':' + 
-                   self._cfg['resource_manager']['pbs-account'])))
+      # In HOD 0.4, we pass in an account string only if it is mentioned.
+      # Also, we don't append userid to the account string, as HOD jobs run as the 
+      # user running them, not as 'HOD' user.
+      if self._cfg['resource_manager'].has_key('pbs-account'):
+        argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))
     
       if 'env-vars' in self._cfg['resource_manager']:
         qsub_envs = self._cfg['resource_manager']['env-vars']
@@ -177,7 +180,7 @@
   def __keyValToString(self, keyValList):
     ret = ""
     for key in keyValList:
-      ret = "%s%s=%s," % (ret, key, keyValList[key][0])
+      ret = "%s%s=%s," % (ret, key, keyValList[key])
     return ret[:-1]
   
   def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
@@ -288,5 +291,10 @@
   def runWorkers(self, args):
     return self.__torque.pbsdsh(args)
 
-
+  def updateWorkerInfo(self, workerInfoMap, jobId):
+    workerInfoStr = ''
+    for key in workerInfoMap.keys():
+      workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])
+    exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)
+    return exitCode
 

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py Mon Jan 28 07:58:08 2008
@@ -16,7 +16,20 @@
 import os, re, time
 from hodlib.Common.threads import loop, func
 from hodlib.Common.threads import simpleCommand
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, hadoopVersion
+
+class HadoopJobStatus:
+  """This class represents the status of a single Hadoop job"""
+  
+  def __init__(self, jobId, status):
+    self.__jobId = jobId
+    self.__status = status
+
+  def getJobId(self):
+    return self.__jobId
+
+  def getStatus(self):
+    return self.__status
 
 class JobTrackerMonitor:
   """This class monitors the JobTracker of an allocated cluster
@@ -39,9 +52,11 @@
     # The service info provider will be polled until we get the URL.
     self.__serviceInfoProvider = servInfoProvider
     self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
+    self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")
     self.__firstIdleTime = 0
+    self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
     #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
-    if not self.__isCompatibleHadoopVersion():
+    if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
       raise Exception('Incompatible Hadoop Version: Cannot check status')
     self.__stopFlag = False
     self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
@@ -87,6 +102,36 @@
     except:
       self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
 
+  def getJobsStatus(self):
+    """This method should return the status of all jobs that are run on the HOD allocated
+       hadoop cluster"""
+    jobStatusList = []
+    try:
+      hadoop16Version = { 'major' : '0', 'minor' : '16' }
+      if self.__isCompatibleHadoopVersion(hadoop16Version):
+        jtStatusCommand = self.__initStatusCommand(option='-list all')
+        jtStatusCommand.start()
+        jtStatusCommand.wait()
+        jtStatusCommand.join()
+        if jtStatusCommand.exit_code() == 0:
+          for line in jtStatusCommand.output():
+            jobStatus = self.__extractJobStatus(line)
+            if jobStatus is not None:
+              jobStatusList.append(jobStatus)
+    except:
+      self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
+    return jobStatusList
+
+  def __extractJobStatus(self, line):
+    """This method parses an output line from the job status command and creates
+       the JobStatus object if there is a match"""
+    jobStatus = None
+    line = line.strip()
+    jsMatch = self.__jobStatusRegExp.match(line)
+    if jsMatch:
+      jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
+    return jobStatus
+
   def __isIdle(self):
     """This method checks if the JobTracker is idle beyond a certain limit."""
     if self.__getJobCount() == 0:
@@ -121,47 +166,25 @@
           jobs = int(match.group(1))
     return jobs
 
-  def __findHadoopVersion(self):
-    """This method determines the version of hadoop being used by executing the 
-       hadoop version command"""
-    verMap = { 'major' : None, 'minor' : None }
-    hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
-    cmd = "%s version" % hadoopPath
-    self.__log.debug('Executing command %s to find hadoop version' % cmd)
-    env = os.environ
-    env['JAVA_HOME'] = self.__javaHome
-    hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
-    hadoopVerCmd.start()
-    hadoopVerCmd.wait()
-    hadoopVerCmd.join()
-    if hadoopVerCmd.exit_code() == 0:
-      verLine = hadoopVerCmd.output()[0]
-      self.__log.debug('Version from hadoop command: %s' % verLine)
-      hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
-      verMatch = hadoopVerRegExp.match(verLine)
-      if verMatch != None:
-        verMap['major'] = verMatch.group(1)
-        verMap['minor'] = verMatch.group(2)
-
-    return verMap
-
-  def __isCompatibleHadoopVersion(self):
+  def __isCompatibleHadoopVersion(self, expectedVersion):
     """This method determines whether the version of hadoop being used is one that 
-       provides the hadoop job -list command or not"""
-    ver = self.__findHadoopVersion()
+       is higher than the expectedVersion.
+       This can be used for checking if a particular feature is available or not"""
+    ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
     ret = False
   
-    if (ver['major']!=None) and (int(ver['major']) >= 0) \
-      and (ver['minor']!=None) and (int(ver['minor']) >= 15):
+    if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \
+      and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
       ret = True
-
     return ret
 
-  def __initStatusCommand(self):
+  def __initStatusCommand(self, option="-list"):
     """This method initializes the command to run to check the JT status"""
     cmd = None
     hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
-    cmdStr = "%s job -jt %s -list" % (hadoopPath, self.__jobTrackerURL)
+    cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
+    cmdStr = "%s %s" % (cmdStr, option)
+    self.__log.debug('cmd str %s' % cmdStr)
     env = os.environ
     env['JAVA_HOME'] = self.__javaHome
     cmd = simpleCommand('HadoopStatus', cmdStr, env)

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py Mon Jan 28 07:58:08 2008
@@ -28,7 +28,7 @@
 sys.path.append(libdir)
 
 import hodlib.Common.logger
-from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor
+from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
 
 from hodlib.Common.threads import func 
 
@@ -484,7 +484,20 @@
     
     return addr
 
-  
+  def stopRM(self):
+    """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
+    # We spawn a thread here because we want the XMLRPC call to return. Calling
+    # stop directly from here will also stop the XMLRPC server.
+    try:
+      self.log.debug("inside xml-rpc call to stop ringmaster")
+      rmStopperThread = func('RMStopper', self.rm.stop)
+      rmStopperThread.start()
+      self.log.debug("returning from xml-rpc call to stop ringmaster")
+      return True
+    except:
+      self.log.debug("Exception in stop: %s" % get_exception_string())
+      return False
+
 class RingMaster:
   def __init__(self, cfg, log, **kwds):
     """starts nodepool and services"""
@@ -499,6 +512,8 @@
     self.__jtMonitor = None
     self.__idlenessDetected = False
     self.__stopInProgress = False
+    self.__isStopped = False # to let main exit
+    self.__exitCode = 0 # exit code with which the ringmaster main method should return
 
     self.__initialize_signal_handlers()
     
@@ -544,23 +559,33 @@
 
       hdfsDesc = sdl['hdfs']
       hdfs = None
+ 
+      # Determine hadoop Version
+      hadoopVers = hadoopVersion(self.__getHadoopDir(), \
+                                self.cfg['hodring']['java-home'], self.log)
+      
       if hdfsDesc.isExternal():
-        hdfs = HdfsExternal(hdfsDesc, workDirs)
+        hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
+        hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
       else:
-        hdfs = Hdfs(hdfsDesc, workDirs, 0)
+        hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']))
 
       self.serviceDict[hdfs.getName()] = hdfs
       
       mrDesc = sdl['mapred']
       mr = None
       if mrDesc.isExternal():
-        mr = MapReduceExternal(mrDesc, workDirs)
+        mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
+        mr.setMasterParams( self.cfg['gridservice-mapred'] )
       else:
-        mr = MapReduce(mrDesc, workDirs,1)
+        mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']))
 
       self.serviceDict[mr.getName()] = mr
     except:
-      self.log.debug(get_exception_string)
+      self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \
+                            %s." % get_exception_error_string())
+      self.log.debug(get_exception_string())
+      raise
 
     # should not be starting these in a constructor
     ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
@@ -860,23 +885,74 @@
     
     self._finalize()
 
+  def __findExitCode(self):
+    """Determine the exit code based on the status of the cluster or jobs run on them"""
+    xmlrpcServer = ringMasterServer.instance.logMasterSources
+    if xmlrpcServer.getServiceAddr('hdfs') == 'not found':
+      self.__exitCode = 7
+    elif xmlrpcServer.getServiceAddr('mapred') == 'not found':
+      self.__exitCode = 8
+    else:
+      clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
+                                          xmlrpcServer.getServiceAddr('mapred'))
+      if clusterStatus != 0:
+        self.__exitCode = clusterStatus
+      else:
+        self.__exitCode = self.__findHadoopJobsExitCode()
+    self.log.debug('exit code %s' % self.__exitCode)
+
+  def __findHadoopJobsExitCode(self):
+    """Determine the consolidate exit code of hadoop jobs run on this cluster, provided
+       this information is available. Return 0 otherwise"""
+    ret = 0
+    failureStatus = 3
+    failureCount = 0
+    if self.__jtMonitor:
+      jobStatusList = self.__jtMonitor.getJobsStatus()
+      try:
+        if len(jobStatusList) > 0:
+          for jobStatus in jobStatusList:
+            self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), 
+                                                      jobStatus.getStatus()))
+            if jobStatus.getStatus() == failureStatus:
+              failureCount = failureCount+1
+        if failureCount > 0:
+          if failureCount == len(jobStatusList): # all jobs failed
+            ret = 16
+          else:
+            ret = 17
+      except:
+        self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
+    return ret
+
   def stop(self):
     self.log.debug("RingMaster stop method invoked.")
-    if self.__stopInProgress:
+    if self.__stopInProgress or self.__isStopped:
       return
     self.__stopInProgress = True
-    if self.__jtMonitor is not None:
-      self.__jtMonitor.stop()
     if ringMasterServer.instance is not None:
+      self.log.debug('finding exit code')
+      self.__findExitCode()
       self.log.debug('stopping ringmaster instance')
       ringMasterServer.stopService()
+    else:
+      self.__exitCode = 6
+    if self.__jtMonitor is not None:
+      self.__jtMonitor.stop()
     if self.httpServer:
       self.httpServer.stop()
       
     self.__clean_up()
+    self.__isStopped = True
 
-  def isClusterIdle(self):
-    return self.__idlenessDetected
+  def shouldStop(self):
+    """Indicates whether the main loop should exit, either due to idleness condition, 
+    or a stop signal was received"""
+    return self.__idlenessDetected or self.__isStopped
+
+  def getExitCode(self):
+    """return the exit code of the program"""
+    return self.__exitCode
 
 def main(cfg,log):
   try:
@@ -885,10 +961,11 @@
     cfg = dGen.initializeDesc()
     rm = RingMaster(cfg, log)
     rm.start()
-    while not rm.isClusterIdle():
+    while not rm.shouldStop():
       time.sleep(1)
     rm.stop()
     log.debug('returning from main')
+    return rm.getExitCode()
   except Exception, e:
     if log:
       log.critical(get_exception_string())

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py Mon Jan 28 07:58:08 2008
@@ -28,6 +28,7 @@
     self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')
     self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')
     self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')
+    self.__qalter = os.path.join(torqueDir, 'bin', 'qalter')
     self.__env = environment
     
     self.__log = log
@@ -48,11 +49,23 @@
     while qsubProcess.stdin == None:
       time.sleep(.2)
 
-    for line in stdinList:
-      self.__log.debug("qsub stdin: %s" % line)
-      print >>qsubProcess.stdin, line
+    try:
+      for line in stdinList:
+        self.__log.debug("qsub stdin: %s" % line)
+        print >>qsubProcess.stdin, line
+      qsubProcess.stdin.close()
+    except IOError, i:
+      # If torque's qsub is given invalid params, it fails & returns immediately
+      # Check for such errors here
+      # Wait for command execution to finish
+      qsubProcess.wait()
+      qsubProcess.join()
+      output = qsubProcess.output()
+      if output!=[]:
+        self.__log.critical("qsub Failure : %s " % output[0].strip())
+        self.__log.critical("qsub Command : %s" % qsubCommand)
+      return None, qsubProcess.exit_code()
 
-    qsubProcess.stdin.close()
     qsubProcess.wait()
     qsubProcess.join()
     
@@ -145,3 +158,18 @@
     if not status: status = 0
       
     return status  
+
+  def qalter(self, fieldName, fieldValue, jobId):
+    """Update the job field with fieldName with the fieldValue.
+       The fieldValue must be modifiable after the job is submitted."""
+
+    # E.g. to alter comment: qalter -W notes='value` jobId
+    qalterCmd = '%s -W %s=\"%s\" %s' % (self.__qalter, fieldName, fieldValue, jobId) 
+    self.__log.debug("qalter command: %s" % qalterCmd)
+    qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env)
+    qalterProcess.start()
+    qalterProcess.wait()
+    qalterProcess.join()
+    exitCode = qalterProcess.exit_code()
+
+    return exitCode

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml Mon Jan 28 07:58:08 2008
@@ -144,7 +144,7 @@
               <em>Twisted Python:</em> This can be used for improving the scalability of HOD. Twisted Python is available <a href="http://twistedmatrix.com/trac/">here</a>.
             </li>
             <li>
-            <em>Hadoop:</em> HOD can automatically distribute Hadoop to all nodes in the cluster. However, it can also use a pre-installed version of Hadoop, if it is available on all nodes in the cluster. HOD currently supports only Hadoop 0.16, which is under development.
+            <em>Hadoop:</em> HOD can automatically distribute Hadoop to all nodes in the cluster. However, it can also use a pre-installed version of Hadoop, if it is available on all nodes in the cluster. HOD currently supports Hadoop 0.15 and above.
             </li>
           </ul>
           <p>