You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ni...@apache.org on 2008/01/28 16:58:15 UTC
svn commit: r615919 [2/2] - in /hadoop/core/trunk: ./ src/contrib/hod/
src/contrib/hod/bin/ src/contrib/hod/conf/ src/contrib/hod/hodlib/Common/
src/contrib/hod/hodlib/GridServices/ src/contrib/hod/hodlib/Hod/
src/contrib/hod/hodlib/HodRing/ src/contri...
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py Mon Jan 28 07:58:08 2008
@@ -19,13 +19,14 @@
"""
# -*- python -*-
import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom
-import socket, sets, urllib, csv, signal, pprint, random, re
+import socket, sets, urllib, csv, signal, pprint, random, re, httplib
from xml.dom import getDOMImplementation
from pprint import pformat
from optparse import OptionParser
from urlparse import urlparse
-from hodlib.Common.util import local_fqdn
+from hodlib.Common.util import local_fqdn, parseEquals
+from hodlib.Common.tcp import tcpSocket, tcpError
binfile = sys.path[0]
libdir = os.path.dirname(binfile)
@@ -53,6 +54,7 @@
self.log.debug("In command desc")
self.log.debug("Done in command desc")
dict.setdefault('argv', [])
+ dict.setdefault('version', None)
dict.setdefault('envs', {})
dict.setdefault('java-opts', [])
dict.setdefault('workdirs', [])
@@ -83,6 +85,9 @@
def getArgv(self):
return self.dict['argv']
+ def getVersion(self):
+ return self.dict['version']
+
def getEnvs(self):
return self.dict['envs']
@@ -243,9 +248,13 @@
topElement = doc.documentElement
topElement.appendChild(comment)
- attr = self.desc.getfinalAttrs()
- self.createXML(doc, attr, topElement, True)
- attr = self.desc.getAttrs()
+ finalAttr = self.desc.getfinalAttrs()
+ self.createXML(doc, finalAttr, topElement, True)
+ attr = {}
+ attr1 = self.desc.getAttrs()
+ for k,v in attr1.iteritems():
+ if not finalAttr.has_key(k):
+ attr[k] = v
self.createXML(doc, attr, topElement, False)
@@ -306,7 +315,7 @@
fenvs = os.environ
for k, v in envs.iteritems():
- fenvs[k] = v[0]
+ fenvs[k] = v
self.log.debug(javaOpts)
fenvs['HADOOP_OPTS'] = ''
@@ -440,6 +449,15 @@
self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
return hadoopPackage
+ def getRunningValues(self):
+ return self.__running.values()
+
+ def getTempDir(self):
+ return self.__tempDir
+
+ def getHadoopLogDirs(self):
+ return self.__hadoopLogDirs
+
def __download_package(self, ringClient):
self.log.debug("Found download address: %s" %
self._cfg['download-addr'])
@@ -523,6 +541,75 @@
continue
self.__running[id-1] = cmd
+ # ok.. now command is running. If this HodRing got jobtracker,
+ # Check if it is ready for accepting jobs, and then only return
+ self.__check_jobtracker(desc, id-1)
+
+ def __check_jobtracker(self, desc, id):
+ # Check jobtracker status. Return properly if it is ready to accept jobs.
+ # Currently Checks for Jetty to come up, the last thing that can be checked
+ # before JT completes initialisation. To be perfectly reliable, we need
+ # hadoop support
+ name = desc.getName()
+ if name == 'jobtracker':
+ # Yes I am the Jobtracker
+ self.log.debug("Waiting for jobtracker to initialise")
+ version = desc.getVersion()
+ self.log.debug("jobtracker version : %s" % version)
+ attrs = self.getRunningValues()[id].getFilledInKeyValues()
+ attrs = parseEquals(attrs)
+ jobTrackerAddr = attrs['mapred.job.tracker']
+ self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr)
+ if version < 16:
+ jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \
+ attrs['mapred.job.tracker.info.port']
+ else:
+ jettyAddr = attrs['mapred.job.tracker.http.bindAddress']
+ self.log.debug("Jobtracker jetty : %s" % jettyAddr)
+
+ # Check for Jetty to come up
+ # For this do a http head, and then look at the status
+ defaultTimeout = socket.getdefaulttimeout()
+ # socket timeout isn`t exposed at httplib level. Setting explicitly.
+ socket.setdefaulttimeout(1)
+ sleepTime = 0.5
+ jettyStatus = False
+ jettyStatusmsg = ""
+ while sleepTime <= 32:
+ try:
+ jettyConn = httplib.HTTPConnection(jettyAddr)
+ jettyConn.request("HEAD", "/jobtracker.jsp")
+ # httplib inherently retries the following till socket timeout
+ resp = jettyConn.getresponse()
+ if resp.status != 200:
+ # Some problem?
+ jettyStatus = False
+ jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\
+ " request. HTTP Status (Code, Msg): (%s, %s)" % \
+ ( resp.status, resp.reason )
+ break
+ else:
+ self.log.info("Jetty returned a 200 status (%s)" % resp.reason)
+ self.log.info("JobTracker successfully initialised")
+ return
+ except socket.error:
+ self.log.debug("Jetty gave a socket error. Sleeping for %s" \
+ % sleepTime)
+ time.sleep(sleepTime)
+ sleepTime = sleepTime * 2
+ except Exception, e:
+ jettyStatus = False
+ jettyStatusmsg = ("Process(possibly other than jetty) running on" + \
+ " port assigned to jetty is returning invalid http response")
+ break
+ socket.setdefaulttimeout(defaultTimeout)
+ if not jettyStatus:
+ self.log.critical("Jobtracker failed to initialise.")
+ if jettyStatusmsg:
+ self.log.critical( "Reason: %s" % jettyStatusmsg )
+ else: self.log.critical( "Reason: Jetty failed to give response")
+ raise Exception("JobTracker failed to initialise")
+
def stop(self):
self.log.debug("Entered hodring stop.")
if self._http:
@@ -532,153 +619,12 @@
self.log.debug("call hodsvcrgy stop...")
hodBaseService.stop(self)
- self.clean_up()
-
- def clean_up(self):
- os.chdir(originalcwd)
- if not mswindows:
- # do the UNIX double-fork magic, see Stevens' "Advanced
- # Programming in the UNIX Environment" for details (ISBN 0201563177)
- try:
- pid = os.fork()
- if pid > 0:
- # exit first parent
- sys.exit(0)
- except OSError, e:
- self.log.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror))
- sys.exit(1)
-
- # decouple from parent environment
- os.chdir("/")
- os.setsid()
- os.umask(0)
-
- # do second fork
- try:
- pid = os.fork()
- if pid > 0:
- # exit from second parent, print eventual PID before
- sys.exit(0)
- except OSError, e:
- self.log.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror))
- sys.exit(1)
-
- try:
-# for cmd in self.__running.values():
-# self.log.debug("killing %s..." % cmd)
-# cmd.kill()
-
- list = []
-
- for cmd in self.__running.values():
- self.log.debug("addding %s to cleanup list..." % cmd)
- cmd.addCleanup(list)
-
- list.append(self.__tempDir)
-
- self.__archive_logs()
-
- for dir in list:
- if os.path.exists(dir):
- self.log.debug('removing %s' % (dir))
- shutil.rmtree(dir, True)
- except:
- self.log.error(get_exception_string())
- sys.exit(0)
-
def _xr_method_clusterStart(self, initialize=True):
return self.clusterStart(initialize)
def _xr_method_clusterStop(self):
return self.clusterStop()
- def __copy_archive_to_dfs(self, archiveFile):
- hdfsURIMatch = reHdfsURI.match(self._cfg['log-destination-uri'])
-
- # FIXME this is a complete and utter hack. Currently hadoop is broken
- # and does not understand hdfs:// syntax on the command line :(
-
- pid = os.getpid()
- tempConfDir = '/tmp/%s' % pid
- os.mkdir(tempConfDir)
- tempConfFileName = '%s/hadoop-site.xml' % tempConfDir
- tempHadoopConfig = open(tempConfFileName, 'w')
- print >>tempHadoopConfig, "<configuration>"
- print >>tempHadoopConfig, " <property>"
- print >>tempHadoopConfig, " <name>fs.default.name</name>"
- print >>tempHadoopConfig, " <value>%s</value>" % hdfsURIMatch.group(1)
- print >>tempHadoopConfig, " <description>No description</description>"
- print >>tempHadoopConfig, " </property>"
- print >>tempHadoopConfig, "</configuration>"
- tempHadoopConfig.close()
-
- # END LAME HACK
-
- (head, tail) = os.path.split(archiveFile)
- destFile = os.path.join(hdfsURIMatch.group(2), self._cfg['userid'],
- self._cfg['service-id'], tail)
-
- self.log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
-
- runningHadoops = self.__running.values()
- if (len(runningHadoops) == 0):
- self.log.info("len(runningHadoops) == 0, No running cluster?")
- self.log.info("Skipping __copy_archive_to_dfs")
- return
-
- run = runningHadoops[0]
- hadoopCmd = run.path
- if self._cfg.has_key('pkgs'):
- hadoopCmd = os.path.join(self._cfg['pkgs'], 'bin', 'hadoop')
-
- # LAME HACK AGAIN, using config generated above :(
- copyCommand = "%s --config %s dfs -copyFromLocal %s %s" % (hadoopCmd,
- tempConfDir, archiveFile, destFile)
-
- self.log.debug(copyCommand)
-
- copyThread = simpleCommand('hadoop', copyCommand)
- copyThread.start()
- copyThread.wait()
- copyThread.join()
- self.log.debug(pprint.pformat(copyThread.output()))
-
- # LAME HACK AGAIN, deleting config generated above :(
- os.unlink(tempConfFileName)
- os.rmdir(tempConfDir)
- os.unlink(archiveFile)
-
- def __archive_logs(self):
- status = True
- if self._cfg.has_key("log-destination-uri"):
- try:
- if self.__hadoopLogDirs:
- date = time.localtime()
- for logDir in self.__hadoopLogDirs:
- (head, tail) = os.path.split(logDir)
- (head, logType) = os.path.split(head)
- tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
- logType, local_fqdn(), date[0], date[1], date[2], date[3],
- date[4], date[5], random.randint(0,1000))
-
- if self._cfg["log-destination-uri"].startswith('file://'):
- tarBallFile = os.path.join(self._cfg["log-destination-uri"][7:],
- tarBallFile)
- else:
- tarBallFile = os.path.join(self._cfg['temp-dir'], tarBallFile)
-
- self.log.info('archiving log files to: %s' % tarBallFile)
- status = tar(tarBallFile, logDir, ['*',])
- self.log.info('archive %s status: %s' % (tarBallFile, status))
- if status and \
- self._cfg["log-destination-uri"].startswith('hdfs://'):
- self.__copy_archive_to_dfs(tarBallFile)
- dict = {}
- except:
- self.log.error(get_exception_string())
-
- return status
-
def start(self):
"""Run and maintain hodring commands"""
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py Mon Jan 28 07:58:08 2008
@@ -150,7 +150,8 @@
break
argList.extend(process_qsub_attributes())
- argList.extend(('-N', 'HOD'))
+
+ argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))
argList.extend(('-r','n'))
if 'pbs-user' in self._cfg['resource_manager']:
@@ -161,9 +162,11 @@
queue = self._cfg['resource_manager']['queue']
argList.extend(('-q',queue))
- # accounting should recognize userid:pbs-account as being "charged"
- argList.extend(('-A', (self._cfg['hod']['userid'] + ':' +
- self._cfg['resource_manager']['pbs-account'])))
+ # In HOD 0.4, we pass in an account string only if it is mentioned.
+ # Also, we don't append userid to the account string, as HOD jobs run as the
+ # user running them, not as 'HOD' user.
+ if self._cfg['resource_manager'].has_key('pbs-account'):
+ argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))
if 'env-vars' in self._cfg['resource_manager']:
qsub_envs = self._cfg['resource_manager']['env-vars']
@@ -177,7 +180,7 @@
def __keyValToString(self, keyValList):
ret = ""
for key in keyValList:
- ret = "%s%s=%s," % (ret, key, keyValList[key][0])
+ ret = "%s%s=%s," % (ret, key, keyValList[key])
return ret[:-1]
def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
@@ -288,5 +291,10 @@
def runWorkers(self, args):
return self.__torque.pbsdsh(args)
-
+ def updateWorkerInfo(self, workerInfoMap, jobId):
+ workerInfoStr = ''
+ for key in workerInfoMap.keys():
+ workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])
+ exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)
+ return exitCode
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py Mon Jan 28 07:58:08 2008
@@ -16,7 +16,20 @@
import os, re, time
from hodlib.Common.threads import loop, func
from hodlib.Common.threads import simpleCommand
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, hadoopVersion
+
+class HadoopJobStatus:
+ """This class represents the status of a single Hadoop job"""
+
+ def __init__(self, jobId, status):
+ self.__jobId = jobId
+ self.__status = status
+
+ def getJobId(self):
+ return self.__jobId
+
+ def getStatus(self):
+ return self.__status
class JobTrackerMonitor:
"""This class monitors the JobTracker of an allocated cluster
@@ -39,9 +52,11 @@
# The service info provider will be polled until we get the URL.
self.__serviceInfoProvider = servInfoProvider
self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
+ self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")
self.__firstIdleTime = 0
+ self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
#Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
- if not self.__isCompatibleHadoopVersion():
+ if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
raise Exception('Incompatible Hadoop Version: Cannot check status')
self.__stopFlag = False
self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
@@ -87,6 +102,36 @@
except:
self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
+ def getJobsStatus(self):
+ """This method should return the status of all jobs that are run on the HOD allocated
+ hadoop cluster"""
+ jobStatusList = []
+ try:
+ hadoop16Version = { 'major' : '0', 'minor' : '16' }
+ if self.__isCompatibleHadoopVersion(hadoop16Version):
+ jtStatusCommand = self.__initStatusCommand(option='-list all')
+ jtStatusCommand.start()
+ jtStatusCommand.wait()
+ jtStatusCommand.join()
+ if jtStatusCommand.exit_code() == 0:
+ for line in jtStatusCommand.output():
+ jobStatus = self.__extractJobStatus(line)
+ if jobStatus is not None:
+ jobStatusList.append(jobStatus)
+ except:
+ self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
+ return jobStatusList
+
+ def __extractJobStatus(self, line):
+ """This method parses an output line from the job status command and creates
+ the JobStatus object if there is a match"""
+ jobStatus = None
+ line = line.strip()
+ jsMatch = self.__jobStatusRegExp.match(line)
+ if jsMatch:
+ jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
+ return jobStatus
+
def __isIdle(self):
"""This method checks if the JobTracker is idle beyond a certain limit."""
if self.__getJobCount() == 0:
@@ -121,47 +166,25 @@
jobs = int(match.group(1))
return jobs
- def __findHadoopVersion(self):
- """This method determines the version of hadoop being used by executing the
- hadoop version command"""
- verMap = { 'major' : None, 'minor' : None }
- hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
- cmd = "%s version" % hadoopPath
- self.__log.debug('Executing command %s to find hadoop version' % cmd)
- env = os.environ
- env['JAVA_HOME'] = self.__javaHome
- hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
- hadoopVerCmd.start()
- hadoopVerCmd.wait()
- hadoopVerCmd.join()
- if hadoopVerCmd.exit_code() == 0:
- verLine = hadoopVerCmd.output()[0]
- self.__log.debug('Version from hadoop command: %s' % verLine)
- hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
- verMatch = hadoopVerRegExp.match(verLine)
- if verMatch != None:
- verMap['major'] = verMatch.group(1)
- verMap['minor'] = verMatch.group(2)
-
- return verMap
-
- def __isCompatibleHadoopVersion(self):
+ def __isCompatibleHadoopVersion(self, expectedVersion):
"""This method determines whether the version of hadoop being used is one that
- provides the hadoop job -list command or not"""
- ver = self.__findHadoopVersion()
+ is higher than the expectedVersion.
+ This can be used for checking if a particular feature is available or not"""
+ ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
ret = False
- if (ver['major']!=None) and (int(ver['major']) >= 0) \
- and (ver['minor']!=None) and (int(ver['minor']) >= 15):
+ if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \
+ and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
ret = True
-
return ret
- def __initStatusCommand(self):
+ def __initStatusCommand(self, option="-list"):
"""This method initializes the command to run to check the JT status"""
cmd = None
hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
- cmdStr = "%s job -jt %s -list" % (hadoopPath, self.__jobTrackerURL)
+ cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
+ cmdStr = "%s %s" % (cmdStr, option)
+ self.__log.debug('cmd str %s' % cmdStr)
env = os.environ
env['JAVA_HOME'] = self.__javaHome
cmd = simpleCommand('HadoopStatus', cmdStr, env)
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py Mon Jan 28 07:58:08 2008
@@ -28,7 +28,7 @@
sys.path.append(libdir)
import hodlib.Common.logger
-from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor
+from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
from hodlib.Common.threads import func
@@ -484,7 +484,20 @@
return addr
-
+ def stopRM(self):
+ """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
+ # We spawn a thread here because we want the XMLRPC call to return. Calling
+ # stop directly from here will also stop the XMLRPC server.
+ try:
+ self.log.debug("inside xml-rpc call to stop ringmaster")
+ rmStopperThread = func('RMStopper', self.rm.stop)
+ rmStopperThread.start()
+ self.log.debug("returning from xml-rpc call to stop ringmaster")
+ return True
+ except:
+ self.log.debug("Exception in stop: %s" % get_exception_string())
+ return False
+
class RingMaster:
def __init__(self, cfg, log, **kwds):
"""starts nodepool and services"""
@@ -499,6 +512,8 @@
self.__jtMonitor = None
self.__idlenessDetected = False
self.__stopInProgress = False
+ self.__isStopped = False # to let main exit
+ self.__exitCode = 0 # exit code with which the ringmaster main method should return
self.__initialize_signal_handlers()
@@ -544,23 +559,33 @@
hdfsDesc = sdl['hdfs']
hdfs = None
+
+ # Determine hadoop Version
+ hadoopVers = hadoopVersion(self.__getHadoopDir(), \
+ self.cfg['hodring']['java-home'], self.log)
+
if hdfsDesc.isExternal():
- hdfs = HdfsExternal(hdfsDesc, workDirs)
+ hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
+ hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
else:
- hdfs = Hdfs(hdfsDesc, workDirs, 0)
+ hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']))
self.serviceDict[hdfs.getName()] = hdfs
mrDesc = sdl['mapred']
mr = None
if mrDesc.isExternal():
- mr = MapReduceExternal(mrDesc, workDirs)
+ mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
+ mr.setMasterParams( self.cfg['gridservice-mapred'] )
else:
- mr = MapReduce(mrDesc, workDirs,1)
+ mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']))
self.serviceDict[mr.getName()] = mr
except:
- self.log.debug(get_exception_string)
+ self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \
+ %s." % get_exception_error_string())
+ self.log.debug(get_exception_string())
+ raise
# should not be starting these in a constructor
ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
@@ -860,23 +885,74 @@
self._finalize()
+ def __findExitCode(self):
+ """Determine the exit code based on the status of the cluster or jobs run on them"""
+ xmlrpcServer = ringMasterServer.instance.logMasterSources
+ if xmlrpcServer.getServiceAddr('hdfs') == 'not found':
+ self.__exitCode = 7
+ elif xmlrpcServer.getServiceAddr('mapred') == 'not found':
+ self.__exitCode = 8
+ else:
+ clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
+ xmlrpcServer.getServiceAddr('mapred'))
+ if clusterStatus != 0:
+ self.__exitCode = clusterStatus
+ else:
+ self.__exitCode = self.__findHadoopJobsExitCode()
+ self.log.debug('exit code %s' % self.__exitCode)
+
+ def __findHadoopJobsExitCode(self):
+ """Determine the consolidate exit code of hadoop jobs run on this cluster, provided
+ this information is available. Return 0 otherwise"""
+ ret = 0
+ failureStatus = 3
+ failureCount = 0
+ if self.__jtMonitor:
+ jobStatusList = self.__jtMonitor.getJobsStatus()
+ try:
+ if len(jobStatusList) > 0:
+ for jobStatus in jobStatusList:
+ self.log.debug('job status for %s: %s' % (jobStatus.getJobId(),
+ jobStatus.getStatus()))
+ if jobStatus.getStatus() == failureStatus:
+ failureCount = failureCount+1
+ if failureCount > 0:
+ if failureCount == len(jobStatusList): # all jobs failed
+ ret = 16
+ else:
+ ret = 17
+ except:
+ self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
+ return ret
+
def stop(self):
self.log.debug("RingMaster stop method invoked.")
- if self.__stopInProgress:
+ if self.__stopInProgress or self.__isStopped:
return
self.__stopInProgress = True
- if self.__jtMonitor is not None:
- self.__jtMonitor.stop()
if ringMasterServer.instance is not None:
+ self.log.debug('finding exit code')
+ self.__findExitCode()
self.log.debug('stopping ringmaster instance')
ringMasterServer.stopService()
+ else:
+ self.__exitCode = 6
+ if self.__jtMonitor is not None:
+ self.__jtMonitor.stop()
if self.httpServer:
self.httpServer.stop()
self.__clean_up()
+ self.__isStopped = True
- def isClusterIdle(self):
- return self.__idlenessDetected
+ def shouldStop(self):
+ """Indicates whether the main loop should exit, either due to idleness condition,
+ or a stop signal was received"""
+ return self.__idlenessDetected or self.__isStopped
+
+ def getExitCode(self):
+ """return the exit code of the program"""
+ return self.__exitCode
def main(cfg,log):
try:
@@ -885,10 +961,11 @@
cfg = dGen.initializeDesc()
rm = RingMaster(cfg, log)
rm.start()
- while not rm.isClusterIdle():
+ while not rm.shouldStop():
time.sleep(1)
rm.stop()
log.debug('returning from main')
+ return rm.getExitCode()
except Exception, e:
if log:
log.critical(get_exception_string())
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py Mon Jan 28 07:58:08 2008
@@ -28,6 +28,7 @@
self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')
self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')
self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')
+ self.__qalter = os.path.join(torqueDir, 'bin', 'qalter')
self.__env = environment
self.__log = log
@@ -48,11 +49,23 @@
while qsubProcess.stdin == None:
time.sleep(.2)
- for line in stdinList:
- self.__log.debug("qsub stdin: %s" % line)
- print >>qsubProcess.stdin, line
+ try:
+ for line in stdinList:
+ self.__log.debug("qsub stdin: %s" % line)
+ print >>qsubProcess.stdin, line
+ qsubProcess.stdin.close()
+ except IOError, i:
+ # If torque's qsub is given invalid params, it fails & returns immediately
+ # Check for such errors here
+ # Wait for command execution to finish
+ qsubProcess.wait()
+ qsubProcess.join()
+ output = qsubProcess.output()
+ if output!=[]:
+ self.__log.critical("qsub Failure : %s " % output[0].strip())
+ self.__log.critical("qsub Command : %s" % qsubCommand)
+ return None, qsubProcess.exit_code()
- qsubProcess.stdin.close()
qsubProcess.wait()
qsubProcess.join()
@@ -145,3 +158,18 @@
if not status: status = 0
return status
+
+ def qalter(self, fieldName, fieldValue, jobId):
+ """Update the job field with fieldName with the fieldValue.
+ The fieldValue must be modifiable after the job is submitted."""
+
+ # E.g. to alter comment: qalter -W notes='value` jobId
+ qalterCmd = '%s -W %s=\"%s\" %s' % (self.__qalter, fieldName, fieldValue, jobId)
+ self.__log.debug("qalter command: %s" % qalterCmd)
+ qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env)
+ qalterProcess.start()
+ qalterProcess.wait()
+ qalterProcess.join()
+ exitCode = qalterProcess.exit_code()
+
+ return exitCode
Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml Mon Jan 28 07:58:08 2008
@@ -144,7 +144,7 @@
<em>Twisted Python:</em> This can be used for improving the scalability of HOD. Twisted Python is available <a href="http://twistedmatrix.com/trac/">here</a>.
</li>
<li>
- <em>Hadoop:</em> HOD can automatically distribute Hadoop to all nodes in the cluster. However, it can also use a pre-installed version of Hadoop, if it is available on all nodes in the cluster. HOD currently supports only Hadoop 0.16, which is under development.
+ <em>Hadoop:</em> HOD can automatically distribute Hadoop to all nodes in the cluster. However, it can also use a pre-installed version of Hadoop, if it is available on all nodes in the cluster. HOD currently supports Hadoop 0.15 and above.
</li>
</ul>
<p>