You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ni...@apache.org on 2008/01/28 16:58:15 UTC
svn commit: r615919 [1/2] - in /hadoop/core/trunk: ./ src/contrib/hod/
src/contrib/hod/bin/ src/contrib/hod/conf/ src/contrib/hod/hodlib/Common/
src/contrib/hod/hodlib/GridServices/ src/contrib/hod/hodlib/Hod/
src/contrib/hod/hodlib/HodRing/ src/contri...
Author: nigel
Date: Mon Jan 28 07:58:08 2008
New Revision: 615919
URL: http://svn.apache.org/viewvc?rev=615919&view=rev
Log:
HADOOP-2720. Jumbo bug fix patch to HOD. Final sync of Apache SVN with internal Yahoo SVN. Contributed by Hemanth Yamijala.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/hod/bin/VERSION
hadoop/core/trunk/src/contrib/hod/bin/hod
hadoop/core/trunk/src/contrib/hod/bin/hodring
hadoop/core/trunk/src/contrib/hod/bin/ringmaster
hadoop/core/trunk/src/contrib/hod/conf/hodrc
hadoop/core/trunk/src/contrib/hod/getting_started.txt
hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py
hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py
hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py
hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py
hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jan 28 07:58:08 2008
@@ -591,6 +591,9 @@
HADOOP-2576. Namenode performance degradation over time triggered by
large heartbeat interval. (Raghu Angadi)
+ HADOOP-2720. Jumbo bug fix patch to HOD. Final sync of Apache SVN with
+ internal Yahoo SVN. (Hemanth Yamijala via nigel)
+
Release 0.15.3 - 2008-01-18
BUG FIXES
Modified: hadoop/core/trunk/src/contrib/hod/bin/VERSION
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/VERSION?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/VERSION (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/VERSION Mon Jan 28 07:58:08 2008
@@ -1,16 +1 @@
-#Licensed to the Apache Software Foundation (ASF) under one
-#or more contributor license agreements. See the NOTICE file
-#distributed with this work for additional information
-#regarding copyright ownership. The ASF licenses this file
-#to you under the Apache License, Version 2.0 (the
-#"License"); you may not use this file except in compliance
-#with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-#Unless required by applicable law or agreed to in writing, software
-#distributed under the License is distributed on an "AS IS" BASIS,
-#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#See the License for the specific language governing permissions and
-#limitations under the License.
-DEVELOPMENT
+0.4.0
Modified: hadoop/core/trunk/src/contrib/hod/bin/hod
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hod?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/hod (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/hod Mon Jan 28 07:58:08 2008
@@ -45,7 +45,9 @@
from hodlib.Hod.hod import hodRunner
from hodlib.Common.setup import *
from hodlib.Common.descGenerator import *
-from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings, get_exception_error_string
+from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings,\
+ get_exception_error_string, hodInterrupt, \
+ HOD_INTERRUPTED_MESG, HOD_INTERRUPTED_CODE
from hodlib.Common.tcp import tcpError, tcpSocket
filter_warnings()
@@ -91,7 +93,8 @@
False, True, False, True, 's'),
('min-nodes', 'pos_int',
- 'Minimum number of nodes to allocate at startup.',
+ 'Minimum number of nodes to allocate at startup. ' + \
+ 'Used with hod.script option',
True, None, False, True, 'm'),
('script', 'file', 'Hadoop script to execute.',
@@ -124,10 +127,25 @@
False, None, True, True),
('client-params', 'keyval', 'Hadoop client xml key/value list',
- False, None, False, True, 'C'),
+ True, None, False, True, 'C'),
('hadoop-ui-log-dir', 'directory', 'Directory to store Web UI Logs of Hadoop',
- False, None, False, True)),
+ True, None, False, True),
+
+ ('temp-dir', 'directory', 'HOD temporary directories.',
+ False, None, True, False),
+
+ ('update-worker-info', 'bool', 'Specifies whether to update Worker Info after allocation',
+ False, False, False, True),
+
+ ('title', 'string', 'Title for the current HOD allocation.',
+ True, "HOD", False, True, 'N'),
+
+ ('walltime', 'pos_int', 'Walltime in seconds for the current HOD allocation',
+ True, None, False, True),
+
+ ('script-wait-time', 'pos_int', 'Specifies the time to wait before running the script. Used with the hod.script option.',
+ True, 10, False, True, 'W')),
'resource_manager' : (
('id', 'string', 'Batch scheduler ID: torque|condor.',
@@ -137,7 +155,7 @@
False, None, False, True),
('pbs-account', 'string', 'User Account jobs are submitted under.',
- True, pwd.getpwuid(os.getuid())[0], False, False, 'A'),
+ True, None, False, False, 'A'),
('queue', 'string', 'Queue of the batch scheduler to query.',
True, 'batch', False, True, 'Q'),
@@ -215,7 +233,7 @@
False, None, False, False),
('server-params', 'keyval', 'Hadoop xml key/value list',
- False, None, False, True, 'M'),
+ True, None, False, True, 'M'),
('envs', 'keyval', 'environment to run this package in',
False, None, False, False),
@@ -344,140 +362,151 @@
return config['hod'].has_key('script')
if __name__ == '__main__':
- confDef = definition()
- confDef.add_defs(defList, defOrder)
- hodOptions = options(confDef, "./%s -c <CONFIG_FILE> [OPTIONS]" % myName,
- VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG)
-
- # hodConfig is a dict like object, hodConfig[section][name]
try:
- hodConfig = config(hodOptions['config'], configDef=confDef,
- originalDir=hodOptions['hod']['original-dir'],
- options=hodOptions)
- except IOError, e:
- print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
- sys.exit(1)
-
- status = True
- statusMsgs = []
-
- (status,statusMsgs) = hodConfig.verify()
- if not status:
- print >>sys.stderr,"error: bin/hod failed to start."
- for msg in statusMsgs:
- print >>sys.stderr,"%s" % (msg)
- sys.exit(1)
-
- ## TODO : should move the dependency verification to hodConfig.verify
- if hodConfig['hod'].has_key('script') \
- and not hodConfig['hod'].has_key('min-nodes'):
- printErrors(hodConfig.var_error('hod', 'min-nodes',
- "hod.min-nodes must be specified when using hod.script option."))
- sys.exit(1)
-
- if hodConfig['hod'].has_key('min-nodes'):
- if hodConfig['hod']['min-nodes'] < 3:
- printErrors(hodConfig.var_error('hod', 'min-nodes',
- "hod.min-nodes must be >= 3 nodes: %s." %
- hodConfig['hod']['min-nodes']))
- sys.exit(1)
+ confDef = definition()
+ confDef.add_defs(defList, defOrder)
+ hodOptions = options(confDef, "./%s -c <CONFIG_FILE> [OPTIONS]" % myName,
+ VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG)
- if hodConfig['hod'].has_key('operation') and \
- hodConfig['hod'].has_key('script'):
- print "Script execution and hod operations are mutually exclusive."
- hodOptions.print_help(sys.stderr)
- sys.exit(1)
-
- if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
- print "HOD requires at least a script or operation be specified."
- hodOptions.print_help(sys.stderr)
- sys.exit(1)
-
- if hodConfig['gridservice-hdfs']['external']:
- hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'],
- hodConfig['gridservice-hdfs']['fs_port'])
-
- hdfsSocket = tcpSocket(hdfsAddress)
-
+ # hodConfig is a dict like object, hodConfig[section][name]
try:
- hdfsSocket.open()
- hdfsSocket.close()
- except tcpError:
- printErrors(hodConfig.var_error('hod', 'gridservice-hdfs',
- "Failed to open a connection to external hdfs address: %s." %
- hdfsAddress))
+ hodConfig = config(hodOptions['config'], configDef=confDef,
+ originalDir=hodOptions['hod']['original-dir'],
+ options=hodOptions)
+ except IOError, e:
+ print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
sys.exit(1)
- else:
- hodConfig['gridservice-hdfs']['host'] = 'localhost'
-
- if hodConfig['gridservice-mapred']['external']:
- mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'],
- hodConfig['gridservice-mapred']['tracker_port'])
-
- mapredSocket = tcpSocket(mapredAddress)
-
- try:
- mapredSocket.open()
- mapredSocket.close()
- except tcpError:
- printErrors(hodConfig.var_error('hod', 'gridservice-mapred',
- "Failed to open a connection to external mapred address: %s." %
- mapredAddress))
+
+ status = True
+ statusMsgs = []
+
+ (status,statusMsgs) = hodConfig.verify()
+ if not status:
+ print >>sys.stderr,"error: bin/hod failed to start."
+ for msg in statusMsgs:
+ print >>sys.stderr,"%s" % (msg)
sys.exit(1)
- else:
- hodConfig['gridservice-mapred']['host'] = 'localhost'
-
- if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
- not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
- op_requires_pkgs(hodConfig):
- printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs',
- "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
- + "is not defined."))
- sys.exit(1)
-
- if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
- not hodConfig['gridservice-mapred'].has_key('pkgs') and \
- op_requires_pkgs(hodConfig):
- printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs',
- "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
- + "is not defined."))
- sys.exit(1)
-
- if hodConfig['hodring'].has_key('log-destination-uri'):
- if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
- pass
- elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
- hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
- hostPort = hostPort[0]
- socket = tcpSocket(hostPort)
+
+ ## TODO : should move the dependency verification to hodConfig.verify
+ if hodConfig['hod'].has_key('script') \
+ and not hodConfig['hod'].has_key('min-nodes'):
+ printErrors(hodConfig.var_error('hod', 'min-nodes',
+ "hod.min-nodes must be specified when using hod.script option."))
+ sys.exit(1)
+
+ if hodConfig['hod'].has_key('min-nodes'):
+ if hodConfig['hod']['min-nodes'] < 3:
+ printErrors(hodConfig.var_error('hod', 'min-nodes',
+ "hod.min-nodes must be >= 3 nodes: %s." %
+ hodConfig['hod']['min-nodes']))
+ sys.exit(1)
+
+ if hodConfig['hod'].has_key('operation') and \
+ hodConfig['hod'].has_key('script'):
+ print "Script execution and hod operations are mutually exclusive."
+ hodOptions.print_help(sys.stderr)
+ sys.exit(1)
+
+ if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
+ print "HOD requires at least a script or operation be specified."
+ hodOptions.print_help(sys.stderr)
+ sys.exit(1)
+
+ if hodConfig['gridservice-hdfs']['external']:
+ hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'],
+ hodConfig['gridservice-hdfs']['fs_port'])
+
+ hdfsSocket = tcpSocket(hdfsAddress)
+
try:
- socket.open()
- socket.close()
- except:
- printErrors(hodConfig.var_error('hodring', 'log-destination-uri',
- "Unable to contact host/port specified in log destination uri: %s" %
- hodConfig['hodring']['log-destination-uri']))
+ hdfsSocket.open()
+ hdfsSocket.close()
+ except tcpError:
+ printErrors(hodConfig.var_error('hod', 'gridservice-hdfs',
+ "Failed to open a connection to external hdfs address: %s." %
+ hdfsAddress))
+ sys.exit(1)
+ else:
+ hodConfig['gridservice-hdfs']['host'] = 'localhost'
+
+ if hodConfig['gridservice-mapred']['external']:
+ mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'],
+ hodConfig['gridservice-mapred']['tracker_port'])
+
+ mapredSocket = tcpSocket(mapredAddress)
+
+ try:
+ mapredSocket.open()
+ mapredSocket.close()
+ except tcpError:
+ printErrors(hodConfig.var_error('hod', 'gridservice-mapred',
+ "Failed to open a connection to external mapred address: %s." %
+ mapredAddress))
sys.exit(1)
else:
- printErrors(hodConfig.var_error('hodring', 'log-destination-uri',
- "The log destiniation uri must be of type local:// or hdfs://."))
+ hodConfig['gridservice-mapred']['host'] = 'localhost'
+
+ if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
+ not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
+ op_requires_pkgs(hodConfig):
+ printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs',
+ "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
+ + "is not defined."))
sys.exit(1)
- ## TODO : end of should move the dependency verification to hodConfig.verif
-
- hodConfig['hod']['base-dir'] = rootDirectory
- hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
-
- dGen = DescGenerator(hodConfig)
- hodConfig = dGen.initializeDesc()
- os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
+ if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
+ not hodConfig['gridservice-mapred'].has_key('pkgs') and \
+ op_requires_pkgs(hodConfig):
+ printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs',
+ "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
+ + "is not defined."))
+ sys.exit(1)
+
+ if hodConfig['hodring'].has_key('log-destination-uri'):
+ if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
+ pass
+ elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
+ hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
+ hostPort = hostPort[0]
+ socket = tcpSocket(hostPort)
+ try:
+ socket.open()
+ socket.close()
+ except:
+ printErrors(hodConfig.var_error('hodring', 'log-destination-uri',
+ "Unable to contact host/port specified in log destination uri: %s" %
+ hodConfig['hodring']['log-destination-uri']))
+ sys.exit(1)
+ else:
+ printErrors(hodConfig.var_error('hodring', 'log-destination-uri',
+ "The log destiniation uri must be of type local:// or hdfs://."))
+ sys.exit(1)
+
+ ## TODO : end of should move the dependency verification to hodConfig.verif
+
+ hodConfig['hod']['base-dir'] = rootDirectory
+ hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
+
+ dGen = DescGenerator(hodConfig)
+ hodConfig = dGen.initializeDesc()
+
+ os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
+
+ if hodConfig['hod']['debug'] == 4:
+ print ""
+ print "Using Python: %s" % sys.version
+ print ""
+
+ hod = hodRunner(hodConfig)
- if hodConfig['hod']['debug'] == 4:
- print ""
- print "Using Python: %s" % sys.version
- print ""
+ # Initiate signal handling
+ hodInterrupt.set_log(hod.get_logger())
+ hodInterrupt.init_signals()
+ # Interrupts set up. Now on we handle signals only when we wish to.
+ except KeyboardInterrupt:
+ print HOD_INTERRUPTED_MESG
+ sys.exit(HOD_INTERRUPTED_CODE)
- hod = hodRunner(hodConfig)
if hodConfig['hod'].has_key('script'):
sys.exit(hod.script())
else:
Modified: hadoop/core/trunk/src/contrib/hod/bin/hodring
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hodring?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/hodring (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/hodring Mon Jan 28 07:58:08 2008
@@ -188,6 +188,102 @@
service = HodRing(hodRingOptions)
service.start()
service.wait()
+
+ if service.log:
+ log = service.log
+ else:
+ log = getLogger(hodRingOptions)
+
+ list = []
+
+ runningHadoops = service.getRunningValues()
+
+ for cmd in runningHadoops:
+ log.debug("addding %s to cleanup list..." % cmd)
+ cmd.addCleanup(list)
+
+ list.append(service.getTempDir())
+ log.debug(list)
+
+ # archive_logs now
+ cmdString = os.path.join(rootDirectory, "bin", "hodcleanup") # same python
+
+ if (len(runningHadoops) == 0):
+ log.info("len(runningHadoops) == 0, No running cluster?")
+ log.info("Skipping __copy_archive_to_dfs")
+ hadoopString = ""
+ else: hadoopString=runningHadoops[0].path
+
+ #construct the arguments
+ if hodRingOptions['hodring'].has_key('log-destination-uri'):
+ cmdString = cmdString + " --log-destination-uri " \
+ + hodRingOptions['hodring']['log-destination-uri']
+
+ hadoopLogDirs = service.getHadoopLogDirs()
+ if hadoopLogDirs:
+ cmdString = cmdString \
+ + " --hadoop-log-dirs " \
+ + ",".join(hadoopLogDirs)
+
+ cmdString = cmdString \
+ + " --temp-dir " \
+ + service._cfg['temp-dir'] \
+ + " --hadoop-command-string " \
+ + hadoopString \
+ + " --user-id " \
+ + service._cfg['userid'] \
+ + " --service-id " \
+ + service._cfg['service-id'] \
+ + " --hodring-debug " \
+ + str(hodRingOptions['hodring']['debug']) \
+ + " --hodring-log-dir " \
+ + hodRingOptions['hodring']['log-dir'] \
+ + " --hodring-cleanup-list " \
+ + ",".join(list)
+
+ if hodRingOptions['hodring'].has_key('syslog-address'):
+ cmdString = cmdString + " --hodring-syslog-address " \
+ + hodRingOptions['hodring']['syslog-address']
+ if service._cfg.has_key('pkgs'):
+ cmdString = cmdString + " --pkgs " + service._cfg['pkgs']
+
+ log.info("cleanup commandstring : ")
+ log.info(cmdString)
+
+ # clean up
+ cmd = ['/bin/sh', '-c', cmdString]
+
+ mswindows = (sys.platform == "win32")
+ originalcwd = os.getcwd()
+
+ if not mswindows:
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # exit first parent
+ log.info("child(pid: %s) is now doing cleanup" % pid)
+ sys.exit(0)
+ except OSError, e:
+ log.error("fork failed: %d (%s)" % (e.errno, e.strerror))
+ sys.exit(1)
+
+ # decouple from parent environment
+ os.chdir("/")
+ os.setsid()
+ os.umask(0)
+
+ MAXFD = 128 # more than enough file descriptors to close. Just in case.
+ for i in xrange(0, MAXFD):
+ try:
+ os.close(i)
+ except OSError:
+ pass
+
+ try:
+ os.execvp(cmd[0], cmd)
+ finally:
+ log.critical("exec failed")
+ os._exit(1)
except Exception:
if service:
@@ -195,4 +291,4 @@
log = service.log
else:
log = getLogger(hodRingOptions)
- log.error("bin/hodring failed to start. %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))
+ log.error("Error in bin/hodring %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))
Modified: hadoop/core/trunk/src/contrib/hod/bin/ringmaster
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/ringmaster?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/ringmaster (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/ringmaster Mon Jan 28 07:58:08 2008
@@ -122,7 +122,7 @@
False, None, False, True),
('pbs-account', 'string', 'User Account jobs are submitted under.',
- False, None, True, False),
+ False, None, False, False),
('queue', 'string', 'Queue of the batch scheduler to query.',
False, None, False, False),
@@ -317,14 +317,19 @@
confDef.add_defs(defList, defOrder)
ringMasterOptions = options(confDef, "./%s [OPTIONS]" % myName, VERSION)
ensureLogDir(ringMasterOptions['ringmaster']['log-dir'])
- log = getLogger(ringMasterOptions['ringmaster'])
+ log = None
try:
+ log = getLogger(ringMasterOptions['ringmaster'])
(status, statusMsgs) = ringMasterOptions.verify()
if not status:
raise Exception("%s" % statusMsgs)
+ ringMasterOptions.replace_escape_seqs()
ringMasterOptions['ringmaster']['base-dir'] = rootDirectory
- main(ringMasterOptions,log)
- sys.exit(0)
+ ret = main(ringMasterOptions,log)
+ sys.exit(ret)
except Exception, e:
- log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+ if log:
+ log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+ # Ringmaster failing to start is a ringmaster error. Exit with the appropriate exit code.
+ sys.exit(6)
Modified: hadoop/core/trunk/src/contrib/hod/conf/hodrc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/conf/hodrc?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/conf/hodrc (original)
+++ hadoop/core/trunk/src/contrib/hod/conf/hodrc Mon Jan 28 07:58:08 2008
@@ -6,6 +6,7 @@
xrs-port-range = 32768-65536
debug = 3
allocate-wait-time = 3600
+temp-dir = /tmp/hod
[ringmaster]
register = True
Modified: hadoop/core/trunk/src/contrib/hod/getting_started.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/getting_started.txt?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/getting_started.txt (original)
+++ hadoop/core/trunk/src/contrib/hod/getting_started.txt Mon Jan 28 07:58:08 2008
@@ -26,7 +26,7 @@
cluster. However, it can also use a pre-installed version of Hadoop,
if it is available on all nodes in the cluster.
(http://lucene.apache.org/hadoop)
- HOD currently supports only Hadoop 0.16, which is under development.
+ HOD currently supports Hadoop 0.15 and above.
NOTE: HOD configuration requires the location of installs of these
components to be the same on all nodes in the cluster. It will also
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py Mon Jan 28 07:58:08 2008
@@ -125,38 +125,9 @@
self.dict.setdefault('pkgs', '')
self.dict.setdefault('final-attrs', {})
self._checkRequired()
- self.__dict_update()
-
- def __dict_update(self):
- getattr(self, "_%s" % self.dict['id'])()
-
- def _mapred(self):
- if self.isExternal():
- self.dict['final-attrs']['mapred.job.tracker'] = "%s:%s" % (self.dict['host'],
- self.dict['tracker_port'])
-
- # self.dict['final-attrs']['mapred.job.tracker.info.port'] = \
- # str(self.dict['info_port'])
- # After Hadoop-2185
- self.dict['final-attrs']['mapred.job.tracker.http.bindAddress'] = \
- "%s:%s" %(self.dict['host'], self.dict['info_port'])
-
if self.dict.has_key('hadoop-tar-ball'):
self.dict['tar'] = self.dict['hadoop-tar-ball']
-
- def _hdfs(self):
- if self.isExternal():
- self.dict['final-attrs']['fs.default.name'] = "%s:%s" % (self.dict['host'],
- self.dict['fs_port'])
-
- # self.dict['final-attrs']['dfs.info.port'] = str(self.dict['info_port'])
- # After Hadoop-2185
- self.dict['final-attrs']['dfs.http.bindAddress'] = "%s:%s" % \
- (self.dict['host'], self.dict['info_port'])
-
- if self.dict.has_key('hadoop-tar-ball'):
- self.dict['tar'] = self.dict['hadoop-tar-ball']
-
+
def _checkRequired(self):
if not 'id' in self.dict:
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
#limitations under the License.
# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
#
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/13/2007
#------------------------------------------------------------------------------
import os, time, shutil, xmlrpclib, socket, pprint
@@ -51,7 +50,7 @@
self._init_logging()
- self._init_signals()
+ if name != 'serviceRegistry': self._init_signals()
self._init_xrc_server()
def __set_logging_level(self, level):
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py Mon Jan 28 07:58:08 2008
@@ -16,7 +16,6 @@
# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
#
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
#------------------------------------------------------------------------------
"""'setup' provides for reading and verifing configuration files based on
@@ -26,7 +25,7 @@
from ConfigParser import SafeConfigParser
from optparse import OptionParser, IndentedHelpFormatter, OptionGroup
-from util import get_perms
+from util import get_perms, replace_escapes
from types import typeValidator, is_valid_type, typeToString
reEmailAddress = re.compile("^.*@.*$")
@@ -37,6 +36,8 @@
reCommentNewline = re.compile("\W$")
reKeyVal = r"(?<!\\)="
reKeyVal = re.compile(reKeyVal)
+reKeyValList = r"(?<!\\),"
+reKeyValList = re.compile(reKeyValList)
errorPrefix = 'error'
requiredPerms = '0660'
@@ -485,7 +486,7 @@
# Append to the current list of values in self._dict
if not self._dict[section].has_key(option):
self._dict[section][option] = ""
- dictOpts = self._dict[section][option].split(",")
+ dictOpts = reKeyValList.split(self._dict[section][option])
dictOptsKeyVals = {}
for opt in dictOpts:
if opt != '':
@@ -495,13 +496,16 @@
# we only consider the first '=' for splitting
# we do this to support passing params like
# mapred.child.java.opts=-Djava.library.path=some_dir
+ # Even in case of an invalid error like unescaped '=',
+ # we don't want to fail here itself. We leave such errors
+ # to be caught during validation which happens after this
dictOptsKeyVals[key] = val
else:
# this means an invalid option. Leaving it
#for config.verify to catch
dictOptsKeyVals[opt] = None
- cmdLineOpts = self._options[section][option].split(",")
+ cmdLineOpts = reKeyValList.split(self._options[section][option])
for opt in cmdLineOpts:
if reKeyVal.search(opt):
@@ -573,6 +577,10 @@
raise Exception( error)
sys.exit(1)
+ def replace_escape_seqs(self):
+ """ replace any escaped characters """
+ replace_escapes(self)
+
class formatter(IndentedHelpFormatter):
def format_option_strings(self, option):
"""Return a comma-separated list of option strings & metavariables."""
@@ -667,11 +675,21 @@
self.config = self.__parsedOptions.config
if not self.config:
self.error("configuration file must be specified")
+ if not os.path.isabs(self.config):
+ # A relative path. Append the original directory which would be the
+ # current directory at the time of launch
+ try:
+ origDir = getattr(self.__parsedOptions, 'hod.original-dir')
+ if origDir is not None:
+ self.config = os.path.join(origDir, self.config)
+ self.__parsedOptions.config = self.config
+ except AttributeError, e:
+ self.error("hod.original-dir is not defined.\
+ Cannot get current directory")
if not os.path.exists(self.config):
if self.__defaultLoc and not re.search("/", self.config):
self.__parsedOptions.config = os.path.join(
self.__defaultLoc, self.config)
-
self.__build_dict()
@@ -910,3 +928,6 @@
def verify(self):
return baseConfig.verify(self)
+
+ def replace_escape_seqs(self):
+ replace_escapes(self)
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
#limitations under the License.
# Various socket server and helper classes.
#
-# Christopher Zimmerman - zim@yahoo-inc.com - 03/07/2007
#
import os, sys, socket, threading, pprint, re, xmlrpclib, time
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
#limitations under the License.
# $Id:tcp.py 6172 2007-05-22 20:26:54Z zim $
#
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
#------------------------------------------------------------------------------
""" TCP related classes. """
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py Mon Jan 28 07:58:08 2008
@@ -132,13 +132,16 @@
output = cmd.fromchild.readline()
elif self.__wait == False:
- for output in cmd.fromchild.readlines():
+ output = cmd.fromchild.readline()
+ while output != '':
while not self.running.isSet():
if self.stopFlag.isSet():
break
time.sleep(1)
-
print output,
+ if self.stopFlag.isSet():
+ break
+ output = cmd.fromchild.readline()
else:
self.stdout = cmd.fromchild
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
#limitations under the License.
# $Id:types.py 6172 2007-05-22 20:26:54Z zim $
#
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
#------------------------------------------------------------------------------
""" Higher level data types and type related classes.
@@ -325,12 +324,17 @@
return value
def __tostring_keyval(self, value):
- string = ''
+ string = '"' # to protect from shell escapes
for key in value:
- for item in value[key]:
- string = "%s%s=%s," % (string, key, item)
-
- return string[:-1]
+ # for item in value[key]:
+ # string = "%s%s=%s," % (string, key, item)
+ # Quotes still cannot protect Double-slashes.
+ # Dealing with them separately
+ val = re.sub(r"\\\\",r"\\\\\\\\",value[key])
+
+ string = "%s%s=%s," % (string, key, val)
+
+ return string[:-1] + '"'
def __tostring_list(self, value):
string = ''
@@ -678,13 +682,11 @@
list = self.__norm_list(value)
keyValue = {}
for item in list:
- # we only consider the first '=' for splitting
- # we do this to support passing params like
- # mapred.child.java.opts=-Djava.library.path=some_dir
- (key, value) = reKeyVal.split(item,1)
- if not keyValue.has_key(key):
- keyValue[key] = []
- keyValue[key].append(value)
+ (key, value) = reKeyVal.split(item)
+ #if not keyValue.has_key(key):
+ # keyValue[key] = []
+ #keyValue[key].append(value)
+ keyValue[key] = value
return keyValue
def __verify_list(self, type, value):
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py Mon Jan 28 07:58:08 2008
@@ -13,12 +13,17 @@
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
-import sys, os, traceback, stat, socket, re, warnings
+import sys, os, traceback, stat, socket, re, warnings, signal
from hodlib.Common.tcp import tcpSocket, tcpError
from hodlib.Common.threads import simpleCommand
setUGV = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
+reEscapeSeq = r"\\(.)?"
+reEscapeSeq = re.compile(reEscapeSeq)
+
+HOD_INTERRUPTED_CODE = 127
+HOD_INTERRUPTED_MESG = "Hod Interrupted. Cleaning up and exitting"
class AlarmException(Exception):
def __init__(self, msg=''):
@@ -170,3 +175,117 @@
for item in list:
arg = "%s%s " % (arg, item)
return arg[:-1]
+
+def replace_escapes(object):
+ """ replace any escaped character. e.g \, with , \= with = and so on """
+ # here object is either a config object or a options object
+ for section in object._mySections:
+ for option in object._configDef[section].keys():
+ if object[section].has_key(option):
+ if object._configDef[section][option]['type'] == 'keyval':
+ keyValDict = object[section][option]
+ object[section][option] = {}
+ for (key,value) in keyValDict.iteritems():
+ match = reEscapeSeq.search(value)
+ if match:
+ value = reEscapeSeq.sub(r"\1", value)
+ object[section][option][key] = value
+
+def hadoopVersion(hadoopDir, java_home, log):
+ # Determine the version of hadoop being used by executing the
+ # hadoop version command. Code earlier in idleTracker.py
+ hadoopVersion = { 'major' : None, 'minor' : None }
+ hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
+ cmd = "%s version" % hadoopPath
+ log.debug('Executing command %s to find hadoop version' % cmd)
+ env = os.environ
+ env['JAVA_HOME'] = java_home
+ hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
+ hadoopVerCmd.start()
+ hadoopVerCmd.wait()
+ hadoopVerCmd.join()
+ if hadoopVerCmd.exit_code() == 0:
+ verLine = hadoopVerCmd.output()[0]
+ log.debug('Version from hadoop command: %s' % verLine)
+ hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
+ verMatch = hadoopVerRegExp.match(verLine)
+ if verMatch != None:
+ hadoopVersion['major'] = verMatch.group(1)
+ hadoopVersion['minor'] = verMatch.group(2)
+ return hadoopVersion
+
+
+def get_cluster_status(hdfsAddress, mapredAddress):
+ """Determine the status of the cluster based on socket availability
+ of HDFS and Map/Reduce."""
+ status = 0
+
+ mapredSocket = tcpSocket(mapredAddress)
+ try:
+ mapredSocket.open()
+ mapredSocket.close()
+ except tcpError:
+ status = 14
+
+ hdfsSocket = tcpSocket(hdfsAddress)
+ try:
+ hdfsSocket.open()
+ hdfsSocket.close()
+ except tcpError:
+ if status > 0:
+ status = 10
+ else:
+ status = 13
+
+ return status
+
+def parseEquals(list):
+ # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
+ # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and
+ # HodRing/hodring.py. No need for specially treating escaped =. as in \=,
+ # since all keys are generated by hod and don't contain such anomalies
+ dict = {}
+ for elems in list:
+ splits = elems.split('=')
+ dict[splits[0]] = splits[1]
+ return dict
+
+class HodInterrupt:
+ def __init__(self):
+ self.HodInterruptFlag = False
+ self.log = None
+
+ def set_log(self, log):
+ self.log = log
+
+ def init_signals(self):
+
+ def sigStop(sigNum, handler):
+ sig_wrapper(sigNum, self.setFlag)
+
+ signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
+ signal.signal(signal.SIGQUIT, sigStop) # 3 : Quit program
+ signal.signal(signal.SIGINT, sigStop) # 2 ^C : Interrupt program
+
+ def sig_wrapper(sigNum, handler, *args):
+ self.log.critical("Caught signal %s." % sigNum )
+
+ if args:
+ handler(args)
+ else:
+ handler()
+
+ def setFlag(self, val = True):
+ self.HodInterruptFlag = val
+
+ def isSet(self):
+ return self.HodInterruptFlag
+
+class HodInterruptException(Exception):
+ def __init__(self, value = ""):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+hodInterrupt = HodInterrupt()
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py Mon Jan 28 07:58:08 2008
@@ -14,6 +14,7 @@
#See the License for the specific language governing permissions and
#limitations under the License.
import xmlrpclib, time, random, signal
+from hodlib.Common.util import hodInterrupt
class hodXRClient(xmlrpclib.ServerProxy):
def __init__(self, uri, transport=None, encoding=None, verbose=0,
@@ -42,6 +43,8 @@
break
except Exception:
if self.__retryRequests:
+ if hodInterrupt.isSet():
+ raise HodInterruptException()
time.sleep(retryWaitTime)
else:
raise Exception("hodXRClientTimeout")
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py Mon Jan 28 07:58:08 2008
@@ -22,15 +22,16 @@
from service import *
from hodlib.Hod.nodePool import *
from hodlib.Common.desc import CommandDesc
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, parseEquals
class HdfsExternal(MasterSlave):
"""dummy proxy to external HDFS instance"""
- def __init__(self, serviceDesc, workDirs):
+ def __init__(self, serviceDesc, workDirs, version):
MasterSlave.__init__(self, serviceDesc, workDirs,None)
self.launchedMaster = True
self.masterInitialized = True
+ self.version = version
def getMasterRequest(self):
return None
@@ -49,21 +50,33 @@
addr = attrs['fs.default.name']
return [addr]
- def setMasterParams(self, list):
- raise NotImplementedError
+ def setMasterParams(self, dict):
+ self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
+ (dict['host'], dict['fs_port'])
+
+ if self.version < 16:
+ self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
+ str(self.serviceDesc.dict['info_port'])
+ else:
+ # After Hadoop-2185
+ self.serviceDesc.dict['final-attrs']['dfs.http.bindAddress'] = "%s:%s" % \
+ (dict['host'], dict['info_port'])
def getInfoAddrs(self):
attrs = self.serviceDesc.getfinalAttrs()
- addr = attrs['fs.default.name']
- k,v = addr.split( ":")
- # infoaddr = k + ':' + attrs['dfs.info.port']
- # After Hadoop-2185
- infoaddr = attrs['dfs.http.bindAddress']
+ if self.version < 16:
+ addr = attrs['fs.default.name']
+ k,v = addr.split( ":")
+ infoaddr = k + ':' + attrs['dfs.info.port']
+ else:
+ # After Hadoop-2185
+ infoaddr = attrs['dfs.http.bindAddress']
return [infoaddr]
class Hdfs(MasterSlave):
- def __init__(self, serviceDesc, nodePool, required_node, format=True, upgrade=False):
+ def __init__(self, serviceDesc, nodePool, required_node, version, \
+ format=True, upgrade=False):
MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
self.masterNode = None
self.masterAddr = None
@@ -73,6 +86,7 @@
self.format = format
self.upgrade = upgrade
self.workers = []
+ self.version = version
def getMasterRequest(self):
req = NodeRequest(1, [], False)
@@ -124,16 +138,14 @@
self.masterAddr = dict['fs.default.name']
k,v = self.masterAddr.split( ":")
self.masterNode = k
- # self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
- # After Hadoop-2185
- self.infoAddr = dict['dfs.http.bindAddress']
+ if self.version < 16:
+ self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
+ else:
+ # After Hadoop-2185
+ self.infoAddr = dict['dfs.http.bindAddress']
def _parseEquals(self, list):
- dict = {}
- for elems in list:
- splits = elems.split('=')
- dict[splits[0]] = splits[1]
- return dict
+ return parseEquals(list)
def _getNameNodePort(self):
sd = self.serviceDesc
@@ -152,16 +164,25 @@
def _getNameNodeInfoPort(self):
sd = self.serviceDesc
attrs = sd.getfinalAttrs()
- if 'dfs.http.bindAddress' not in attrs:
- return ServiceUtil.getUniqPort()
+ if self.version < 16:
+ if 'dfs.info.bindAddress' not in attrs:
+ return ServiceUtil.getUniqPort()
+ else:
+ if 'dfs.http.bindAddress' not in attrs:
+ return ServiceUtil.getUniqPort()
- # p = attrs['dfs.info.port']
- p = attrs['dfs.http.bindAddress'].split(':')[1]
+ if self.version < 16:
+ p = attrs['dfs.info.port']
+ else:
+ p = attrs['dfs.http.bindAddress'].split(':')[1]
try:
return int(p)
except:
print get_exception_string()
- raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
+ if self.version < 16:
+ raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
+ else:
+ raise ValueError, "Can't find port from attr dfs.http.bindAddress: %s" % (p)
def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
namedir = None
@@ -183,7 +204,7 @@
attrs['dfs.name.dir'] = namedir
attrs['dfs.data.dir'] = ','.join(datadir)
# FIXME -- change dfs.client.buffer.dir
- envs['HADOOP_ROOT_LOGGER'] = ["INFO,DRFA",]
+ envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
def _getNameNodeCommand(self, format=False, upgrade=False):
@@ -199,13 +220,14 @@
attrs['fs.default.name'] = 'fillinhostport'
#self.infoPort = port = self._getNameNodeInfoPort()
- # if 'dfs.info.port' not in attrs:
- # attrs['dfs.info.port'] = 'fillinport'
-
- # Addressing Hadoop-2815, added the following. Earlier version don't
- # care about this
- if 'dfs.http.bindAddress' not in attrs:
- attrs['dfs.http.bindAddress'] = 'fillinhostport'
+ if self.version < 16:
+ if 'dfs.info.port' not in attrs:
+ attrs['dfs.info.port'] = 'fillinport'
+ else:
+ # Addressing Hadoop-2815, added the following. Earlier versions don't
+ # care about this
+ if 'dfs.http.bindAddress' not in attrs:
+ attrs['dfs.http.bindAddress'] = 'fillinhostport'
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
@@ -277,11 +299,18 @@
attrs['fs.default.name'] = nn
- # Adding the following. Hadoop-2815
- if 'dfs.datanode.bindAddress' not in attrs:
- attrs['dfs.datanode.bindAddress'] = 'fillinhostport'
- if 'dfs.datanode.http.bindAddress' not in attrs:
- attrs['dfs.datanode.http.bindAddress'] = 'fillinhostport'
+ if self.version < 16:
+ if 'dfs.datanode.port' not in attrs:
+ attrs['dfs.datanode.port'] = 'fillinport'
+ if 'dfs.datanode.info.port' not in attrs:
+ attrs['dfs.datanode.info.port'] = 'fillinport'
+ else:
+ # Adding the following. Hadoop-2815
+ if 'dfs.datanode.bindAddress' not in attrs:
+ attrs['dfs.datanode.bindAddress'] = 'fillinhostport'
+ if 'dfs.datanode.http.bindAddress' not in attrs:
+ attrs['dfs.datanode.http.bindAddress'] = 'fillinhostport'
+
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
dict = { 'name' : 'datanode' }
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py Mon Jan 28 07:58:08 2008
@@ -22,15 +22,16 @@
from service import *
from hodlib.Hod.nodePool import *
from hodlib.Common.desc import CommandDesc
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, parseEquals
class MapReduceExternal(MasterSlave):
"""dummy proxy to external MapReduce instance"""
- def __init__(self, serviceDesc, workDirs):
+ def __init__(self, serviceDesc, workDirs, version):
MasterSlave.__init__(self, serviceDesc, workDirs,None)
self.launchedMaster = True
self.masterInitialized = True
+ self.version = version
def getMasterRequest(self):
return None
@@ -55,22 +56,33 @@
def needsLess(self):
return 0
- def setMasterParams(self, list):
- raise NotImplementedError
-
+ def setMasterParams(self, dict):
+ self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'],
+ dict['tracker_port'])
+
+ if self.version < 16:
+ self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = \
+ str(self.serviceDesc.dict['info_port'])
+ else:
+ # After Hadoop-2185
+ self.serviceDesc['final-attrs']['mapred.job.tracker.http.bindAddress'] = \
+ "%s:%s" %(dict['host'], dict['info_port'])
+
def getInfoAddrs(self):
attrs = self.serviceDesc.getfinalAttrs()
- addr = attrs['mapred.job.tracker']
- k,v = addr.split( ":")
- # infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
- # After Hadoop-2185
- # Note: earlier,we never respected mapred.job.tracker.http.bindAddress
- infoaddr = attrs['mapred.job.tracker.http.bindAddress']
+ if self.version < 16:
+ addr = attrs['mapred.job.tracker']
+ k,v = addr.split( ":")
+ infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
+ else:
+ # After Hadoop-2185
+ # Note: earlier,we never respected mapred.job.tracker.http.bindAddress
+ infoaddr = attrs['mapred.job.tracker.http.bindAddress']
return [infoaddr]
class MapReduce(MasterSlave):
- def __init__(self, serviceDesc, workDirs,required_node):
+ def __init__(self, serviceDesc, workDirs,required_node, version):
MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
self.masterNode = None
@@ -78,6 +90,7 @@
self.infoAddr = None
self.workers = []
self.required_node = required_node
+ self.version = version
def isLaunchable(self, serviceDict):
hdfs = serviceDict['hdfs']
@@ -127,16 +140,14 @@
self.masterAddr = dict['mapred.job.tracker']
k,v = self.masterAddr.split(":")
self.masterNode = k
- # self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
- # After Hadoop-2185
- self.infoAddr = dict['mapred.job.tracker.http.bindAddress']
+ if self.version < 16:
+ self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
+ else:
+ # After Hadoop-2185
+ self.infoAddr = dict['mapred.job.tracker.http.bindAddress']
def _parseEquals(self, list):
- dict = {}
- for elems in list:
- splits = elems.split('=')
- dict[splits[0]] = splits[1]
- return dict
+ return parseEquals(list)
def _getJobTrackerPort(self):
sd = self.serviceDesc
@@ -152,21 +163,29 @@
print get_exception_string()
raise ValueError, "Can't find port from attr mapred.job.tracker: %s" % (v)
+ # UNUSED METHOD
def _getJobTrackerInfoPort(self):
sd = self.serviceDesc
attrs = sd.getfinalAttrs()
- # if not 'mapred.job.tracker.info.port' in attrs:
- if 'mapred.job.tracker.http.bindAddress' not in attrs:
- return ServiceUtil.getUniqPort()
-
- # p = attrs['mapred.job.tracker.info.port']
- p = attrs['mapred.job.tracker.http.bindAddress']
+ if self.version < 16:
+ if not 'mapred.job.tracker.info.port' in attrs:
+ return ServiceUtil.getUniqPort()
+ else:
+ if 'mapred.job.tracker.http.bindAddress' not in attrs:
+ return ServiceUtil.getUniqPort()
+
+ if self.version < 16:
+ p = attrs['mapred.job.tracker.info.port']
+ else:
+ p = attrs['mapred.job.tracker.http.bindAddress'].split(':')[1]
try:
return int(p)
except:
print get_exception_string()
- # raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
- raise ValueError, "Can't find port from attr mapred.job.tracker.http.bindAddress: %s" % (p)
+ if self.version < 16:
+ raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
+ else:
+ raise ValueError, "Can't find port from attr mapred.job.tracker.http.bindAddress: %s" % (p)
def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
local = []
@@ -193,7 +212,7 @@
attrs['dfs.client.buffer.dir'] = ','.join(dfsclient)
- envs['HADOOP_ROOT_LOGGER'] = ["INFO,DRFA",]
+ envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
def _getJobTrackerCommand(self, hdfs):
@@ -201,25 +220,28 @@
parentDirs = self.workDirs
workDirs = []
- attrs = sd.getfinalAttrs()
- envs = sd.getEnvs()
+ attrs = sd.getfinalAttrs().copy()
+ envs = sd.getEnvs().copy()
#self.masterPort = port = self._getJobTrackerPort()
if 'mapred.job.tracker' not in attrs:
attrs['mapred.job.tracker'] = 'fillinhostport'
#self.infoPort = port = self._getJobTrackerInfoPort()
- # if 'mapred.job.tracker.info.port' not in attrs:
- # attrs['mapred.job.tracker.info.port'] = 'fillinport'
+ if self.version < 16:
+ if 'mapred.job.tracker.info.port' not in attrs:
+ attrs['mapred.job.tracker.info.port'] = 'fillinport'
+ else:
+ # Addressing Hadoop-2815,
+ if 'mapred.job.tracker.http.bindAddress' not in attrs:
+ attrs['mapred.job.tracker.http.bindAddress'] = 'fillinhostport'
attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
- # Addressing Hadoop-2815,
- if 'mapred.job.tracker.http.bindAddress' not in attrs:
- attrs['mapred.job.tracker.http.bindAddress'] = 'fillinhostport'
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt')
dict = { 'name' : 'jobtracker' }
+ dict['version'] = self.version
dict['program'] = os.path.join('bin', 'hadoop')
dict['argv'] = ['jobtracker']
dict['envs'] = envs
@@ -236,8 +258,8 @@
parentDirs = self.workDirs
workDirs = []
- attrs = sd.getfinalAttrs()
- envs = sd.getEnvs()
+ attrs = sd.getfinalAttrs().copy()
+ envs = sd.getEnvs().copy()
jt = self.masterAddr
if jt == None:
@@ -246,11 +268,17 @@
attrs['mapred.job.tracker'] = jt
attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
- # Adding the following. Hadoop-2815
- if 'mapred.task.tracker.report.bindAddress' not in attrs:
- attrs['mapred.task.tracker.report.bindAddress'] = 'fillinhostport'
- if 'mapred.task.tracker.http.bindAddress' not in attrs:
- attrs['mapred.task.tracker.http.bindAddress'] = 'fillinhostport'
+ if self.version < 16:
+ if 'tasktracker.http.port' not in attrs:
+ attrs['tasktracker.http.port'] = 'fillinport'
+ # earlier to 16, tasktrackers always took ephemeral port 0 for
+ # tasktracker.report.bindAddress
+ else:
+ # Adding the following. Hadoop-2815
+ if 'mapred.task.tracker.report.bindAddress' not in attrs:
+ attrs['mapred.task.tracker.report.bindAddress'] = 'fillinhostport'
+ if 'mapred.task.tracker.http.bindAddress' not in attrs:
+ attrs['mapred.task.tracker.http.bindAddress'] = 'fillinhostport'
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py Mon Jan 28 07:58:08 2008
@@ -57,8 +57,8 @@
return prop
- def gen_site_conf(self, confDir, numNodes, hdfsAddr, mapredAddr=None,\
- clientParams=None, serverParams=None,\
+ def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr,\
+ mapredAddr=None, clientParams=None, serverParams=None,\
finalServerParams=None, clusterFactor=None):
if not mapredAddr:
mapredAddr = "dummy:8181"
@@ -69,51 +69,58 @@
"This is an auto generated hadoop-site.xml, do not modify")
topElement = doc.documentElement
topElement.appendChild(comment)
- prop = self.__create_xml_element(doc, 'mapred.job.tracker',
- mapredAddr, "description")
- topElement.appendChild(prop)
- prop = self.__create_xml_element(doc, 'fs.default.name', hdfsAddr,
- "description")
- topElement.appendChild(prop)
- mapredAddrSplit = mapredAddr.split(":")
- mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
- prop = self.__create_xml_element(doc, 'mapred.system.dir', mapredsystem,
- "description", True )
- topElement.appendChild(prop)
- prop = self.__create_xml_element(doc, 'hadoop.tmp.dir', confDir,
- "description")
- topElement.appendChild(prop)
- prop = self.__create_xml_element(doc, 'dfs.client.buffer.dir',
- confDir, "description")
- topElement.appendChild(prop)
- # clientParams aer enabled now
- if clientParams:
- for k, v in clientParams.iteritems():
- prop = self.__create_xml_element(doc, k, v[0], "client param")
- topElement.appendChild(prop)
+ description = {}
+ paramsDict = { 'mapred.job.tracker' : mapredAddr , \
+ 'fs.default.name' : hdfsAddr, \
+ 'hadoop.tmp.dir' : confDir, \
+ 'dfs.client.buffer.dir' : tempDir, }
+ mapredAddrSplit = mapredAddr.split(":")
+ mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
+ paramsDict['mapred.system.dir'] = mapredsystem
+
+ # mapred-default.xml is no longer used now.
+ numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
+ paramsDict['mapred.reduce.tasks'] = str(numred)
# end
- # servelParams
- if serverParams:
- for k, v in serverParams.iteritems():
- prop = self.__create_xml_element(doc, k, v[0], "server param")
- topElement.appendChild(prop)
+ # for all the above vars generated, set the description
+ for k, v in paramsDict.iteritems():
+ description[k] = 'Hod generated parameter'
# finalservelParams
if finalServerParams:
for k, v in finalServerParams.iteritems():
- prop = self.__create_xml_element(doc, k, v[0], "server param", True)
- topElement.appendChild(prop)
+ if not description.has_key(k):
+ description[k] = "final server parameter"
+ paramsDict[k] = v
-
- # mapred-default.xml is no longer used now.
- numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
- prop = self.__create_xml_element(doc, "mapred.reduce.tasks", str(numred),
- "description")
- topElement.appendChild(prop)
- # end
+ # servelParams
+ if serverParams:
+ for k, v in serverParams.iteritems():
+ if not description.has_key(k):
+ # if no final value for same param is mentioned
+ description[k] = "server parameter"
+ paramsDict[k] = v
+
+ # clientParams
+ if clientParams:
+ for k, v in clientParams.iteritems():
+ if not description.has_key(k) or description[k] == "server parameter":
+ # Just add, if no final value for same param is mentioned.
+ # Replace even if server param is mentioned for same config variable
+ description[k] = "client-side parameter"
+ paramsDict[k] = v
+
+ # generate the xml elements
+ for k,v in paramsDict.iteritems():
+ if ( description[k] == "final server parameter" or \
+ description[k] == "Hod generated parameter" ):
+ final = True
+ else: final = False
+ prop = self.__create_xml_element(doc, k, v, description[k], final)
+ topElement.appendChild(prop)
siteName = os.path.join(confDir, "hadoop-site.xml")
sitefile = file(siteName, 'w')
@@ -174,44 +181,15 @@
return serviceData
- def __check_allocation_manager(self):
- userValid = True
- try:
- self.serviceProxyClient = hodXRClient(
- to_http_url(self.__cfg['hod']['proxy-xrs-address']), None, None, 0,
- 0, 1, False, 15)
-
- userValid = self.serviceProxyClient.isProjectUserValid(
- self.__setup.cfg['hod']['userid'],
- self.__setup.cfg['resource_manager']['pbs-account'],True)
-
- if userValid:
- self.__log.debug("Validated that user %s is part of project %s." %
- (self.__cfg['hod']['userid'],
- self.__cfg['resource_manager']['pbs-account']))
- else:
- self.__log.debug("User %s is not part of project: %s." % (
- self.__cfg['hod']['userid'],
- self.__cfg['resource_manager']['pbs-account']))
- self.__log.error("Please specify a valid project in "
- + "resource_manager.pbs-account. If you still have "
- + "issues, please contact operations")
- userValidd = False
- # ignore invalid project for now - TODO
- except Exception:
- # ignore failures - non critical for now
- self.__log.debug(
- "Unable to contact Allocation Manager Proxy - ignoring...")
- #userValid = False
-
- return userValid
-
def __check_job_status(self):
initWaitCount = 20
count = 0
status = False
state = 'Q'
while state == 'Q':
+ if hodInterrupt.isSet():
+ raise HodInterruptException()
+
state = self.__nodePool.getJobState()
if (state==False) or (state!='Q'):
break
@@ -241,6 +219,9 @@
waitTime = self.__cfg['hod']['allocate-wait-time']
while count < waitTime:
+ if hodInterrupt.isSet():
+ raise HodInterruptException()
+
ringList = self.__svcrgyClient.getServiceInfo(
self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
'ringmaster',
@@ -267,8 +248,11 @@
serviceAddress = None
serviceInfo = None
- for i in range(0, 250):
+ for i in range(0, 250):
try:
+ if hodInterrupt.isSet():
+ raise HodInterruptException()
+
serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
if serviceAddress:
if serviceAddress == 'not found':
@@ -280,6 +264,8 @@
else:
serviceInfo = xmlrpcClient.getURLs(serviceName)
break
+ except HodInterruptException,h :
+ raise h
except:
self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
self.__log.debug(get_exception_string())
@@ -296,6 +282,8 @@
self.jobId, self.__hostname,
serviceName, 'grid', serviceInfo)
+ except HodInterruptException, h:
+ raise h
except:
self.__log.critical("'%s': registry xmlrpc error." % serviceName)
self.__log.debug(get_exception_string())
@@ -326,6 +314,8 @@
link):
for i in range(1,5):
+ if hodInterrupt.isSet():
+ raise HodInterruptException()
try:
input = urllib.urlopen(link)
break
@@ -385,6 +375,8 @@
self.__log.debug("Finished grabbing: %s" % link)
except AlarmException:
+ if hodInterrupt.isSet():
+ raise HodInterruptException()
if out: out.close()
if input: input.close()
@@ -403,31 +395,12 @@
if 'mapred' in clusterInfo:
mapredAddress = clusterInfo['mapred'][7:]
hdfsAddress = clusterInfo['hdfs'][7:]
-
- mapredSocket = tcpSocket(mapredAddress)
-
- try:
- mapredSocket.open()
- mapredSocket.close()
- except tcpError:
- status = 14
-
- hdfsSocket = tcpSocket(hdfsAddress)
-
- try:
- hdfsSocket.open()
- hdfsSocket.close()
- except tcpError:
- if status > 0:
- status = 10
- else:
- status = 13
-
+ status = get_cluster_status(hdfsAddress, mapredAddress)
if status == 0:
status = 12
else:
status = 15
-
+
return status
def cleanup(self):
@@ -455,37 +428,67 @@
self.__log.critical("Minimum nodes must be greater than 2.")
status = 2
else:
- if self.__check_allocation_manager():
- nodeSet = self.__nodePool.newNodeSet(min)
- self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet)
- if self.jobId:
- if self.__check_job_status():
+ nodeSet = self.__nodePool.newNodeSet(min)
+ walltime = None
+ if self.__cfg['hod'].has_key('walltime'):
+ walltime = self.__cfg['hod']['walltime']
+ self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
+ if self.jobId:
+ try:
+ jobStatus = self.__check_job_status()
+ except HodInterruptException, h:
+ self.__log.info(HOD_INTERRUPTED_MESG)
+ self.delete_job(self.jobId)
+ self.__log.info("Job %s qdelled." % self.jobId)
+ raise h
+
+ if jobStatus:
+ self.__log.info("Hod Job successfully submitted. JobId : %s." \
+ % self.jobId)
+ try:
self.ringmasterXRS = self.__get_ringmaster_client()
+
+ self.__log.info("Ringmaster at : %s." % self.ringmasterXRS )
+ ringClient = None
if self.ringmasterXRS:
ringClient = hodXRClient(self.ringmasterXRS)
-
+
hdfsStatus, hdfsAddr, self.hdfsInfo = \
self.__init_hadoop_service('hdfs', ringClient)
-
+
if hdfsStatus:
+ self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
+
mapredStatus, mapredAddr, self.mapredInfo = \
self.__init_hadoop_service('mapred', ringClient)
-
+
if mapredStatus:
- self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
self.__log.info("Mapred UI on http://%s" % self.mapredInfo)
-
+
+ if self.__cfg['hod'].has_key('update-worker-info') \
+ and self.__cfg['hod']['update-worker-info']:
+ workerInfoMap = {}
+ workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
+ workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
+ ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
+ if ret != 0:
+ self.__log.warn('Could not update HDFS and Mapred information.' \
+ 'User Portal may not show relevant information.' \
+ 'Error code=%s' % ret)
+
+ self.__cfg.replace_escape_seqs()
+
# Go generate the client side hadoop-site.xml now
# adding final-params as well, just so that conf on
# client-side and server-side are (almost) the same
clientParams = None
serverParams = {}
finalServerParams = {}
-
+
# client-params
if self.__cfg['hod'].has_key('client-params'):
clientParams = self.__cfg['hod']['client-params']
-
+
# server-params
if self.__cfg['gridservice-mapred'].has_key('server-params'):
serverParams.update(\
@@ -494,8 +497,8 @@
# note that if there are params in both mapred and hdfs
# sections, the ones in hdfs overwirte the ones in mapred
serverParams.update(\
- self.__cfg['gridservice-mapred']['server-params'])
-
+ self.__cfg['gridservice-hdfs']['server-params'])
+
# final-server-params
if self.__cfg['gridservice-mapred'].has_key(\
'final-server-params'):
@@ -505,9 +508,14 @@
'final-server-params'):
finalServerParams.update(\
self.__cfg['gridservice-hdfs']['final-server-params'])
-
+
clusterFactor = self.__cfg['hod']['cluster-factor']
- self.__hadoopCfg.gen_site_conf(clusterDir, min,
+ tempDir = self.__cfg['hod']['temp-dir']
+ if not os.path.exists(tempDir):
+ os.makedirs(tempDir)
+ tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\
+ + "." + self.jobId )
+ self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\
hdfsAddr, mapredAddr, clientParams,\
serverParams, finalServerParams,\
clusterFactor)
@@ -520,25 +528,52 @@
status = 6
if status != 0:
self.__log.info("Cleaning up job id %s, as cluster could not be allocated." % self.jobId)
+ if ringClient is None:
+ self.delete_job(self.jobId)
+ else:
+ self.__log.debug("Calling rm.stop()")
+ ringClient.stopRM()
+ self.__log.debug("Returning from rm.stop()")
+ except HodInterruptException, h:
+ self.__log.info(HOD_INTERRUPTED_MESG)
+ if self.ringmasterXRS:
+ if ringClient is None:
+ ringClient = hodXRClient(self.ringmasterXRS)
+ self.__log.debug("Calling rm.stop()")
+ ringClient.stopRM()
+ self.__log.debug("Returning from rm.stop()")
+ self.__log.info("Job Shutdown by informing ringmaster.")
+ else:
self.delete_job(self.jobId)
- else:
- self.__log.critical("No job found, ringmaster failed to run.")
- status = 5
-
- elif self.jobId == False:
- if exitCode == 188:
- self.__log.critical("Request execeeded maximum resource allocation.")
- else:
- self.__log.critical("Insufficient resources available.")
- status = 4
- else:
- self.__log.critical("Scheduler failure, allocation failed.\n\n")
- status = 4
- else:
- status = 9
+ self.__log.info("Job %s qdelled directly." % self.jobId)
+ raise h
+ else:
+ self.__log.critical("No job found, ringmaster failed to run.")
+ status = 5
+
+ elif self.jobId == False:
+ if exitCode == 188:
+ self.__log.critical("Request execeeded maximum resource allocation.")
+ else:
+ self.__log.critical("Insufficient resources available.")
+ status = 4
+ else:
+ self.__log.critical("Scheduler failure, allocation failed.\n\n")
+ status = 4
return status
+ def __isRingMasterAlive(self, rmAddr):
+ ret = True
+ rmSocket = tcpSocket(rmAddr)
+ try:
+ rmSocket.open()
+ rmSocket.close()
+ except tcpError:
+ ret = False
+
+ return ret
+
def deallocate(self, clusterDir, clusterInfo):
status = 0
@@ -546,6 +581,7 @@
id=clusterInfo['jobid'])
self.mapredInfo = clusterInfo['mapred']
self.hdfsInfo = clusterInfo['hdfs']
+
try:
if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
clusterStatus = self.check_cluster(clusterInfo)
@@ -554,9 +590,35 @@
self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
else:
self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
+ except HodInterruptException, h:
+ # got an interrupt. just pass and proceed to qdel
+ pass
except:
self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
- status = self.__nodePool.finalize()
+
+ rmAddr = None
+ if clusterInfo.has_key('ring'):
+ # format is http://host:port/ We need host:port
+ rmAddr = clusterInfo['ring'][7:]
+ if rmAddr.endswith('/'):
+ rmAddr = rmAddr[:-1]
+
+ if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
+ # Cluster is already dead, don't try to contact ringmaster.
+ self.__nodePool.finalize()
+ status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
+ else:
+ xrsAddr = clusterInfo['ring']
+ rmClient = hodXRClient(xrsAddr)
+ self.__log.debug('calling rm.stop')
+ rmClient.stopRM()
+ self.__log.debug('completed rm.stop')
+
+ # cleanup hod temp dirs
+ tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \
+ self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
+ if os.path.exists(tempDir):
+ shutil.rmtree(tempDir)
return status
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,7 @@
#limitations under the License.
# -*- python -*-
-import sys, os, getpass, pprint, re, cPickle, random, shutil
+import sys, os, getpass, pprint, re, cPickle, random, shutil, time
import hodlib.Common.logger
@@ -23,6 +23,9 @@
from hodlib.Common.xmlrpc import hodXRClient
from hodlib.Common.util import to_http_url, get_exception_string
from hodlib.Common.util import get_exception_error_string
+from hodlib.Common.util import hodInterrupt, HodInterruptException
+from hodlib.Common.util import HOD_INTERRUPTED_CODE
+
from hodlib.Common.nodepoolutil import NodePoolUtil
from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
@@ -115,6 +118,9 @@
level=self.__cfg['hod']['debug'],
addToLoggerNames=(self.__user ,))
+ def get_logger(self):
+ return self.__log
+
def __setup_cluster_logger(self, directory):
self.__baseLogger.add_file(logDirectory=directory, level=4,
addToLoggerNames=(self.__user ,))
@@ -124,6 +130,8 @@
def __norm_cluster_dir(self, directory):
directory = os.path.expanduser(directory)
+ if not os.path.isabs(directory):
+ directory = os.path.join(self.__cfg['hod']['original-dir'], directory)
directory = os.path.abspath(directory)
return directory
@@ -202,7 +210,18 @@
self.__opCode = self.__cluster.check_cluster(clusterInfo)
if self.__opCode == 0 or self.__opCode == 15:
self.__setup_service_registry()
- allocateStatus = self.__cluster.allocate(clusterDir, min, max)
+ if hodInterrupt.isSet():
+ self.__cleanup()
+ raise HodInterruptException()
+ self.__log.info("Service Registry Started.")
+ try:
+ allocateStatus = self.__cluster.allocate(clusterDir, min, max)
+ except HodInterruptException, h:
+ self.__cleanup()
+ raise h
+ # Allocation has gone through.
+ # Don't care about interrupts any more
+
if allocateStatus == 0:
self.__set_cluster_state_info(os.environ,
self.__cluster.hdfsInfo,
@@ -213,6 +232,8 @@
self.__setup_cluster_state(clusterDir)
self.__clusterState.write(self.__cluster.jobId,
self.__clusterStateInfo)
+ # Do we need to check for interrupts here ??
+
self.__set_user_state_info(
{ clusterDir : self.__cluster.jobId, } )
self.__opCode = allocateStatus
@@ -239,7 +260,15 @@
self.__log.critical("%s operation requires two arguments. " % operation
+ "A cluster path and n nodes, or min-max nodes.")
self.__opCode = 3
-
+
+ def _is_cluster_allocated(self, clusterDir):
+ if os.path.isdir(clusterDir):
+ self.__setup_cluster_state(clusterDir)
+ clusterInfo = self.__clusterState.read()
+ if clusterInfo != {}:
+ return True
+ return False
+
def _op_deallocate(self, args):
operation = "deallocate"
argLength = len(args)
@@ -293,25 +322,19 @@
clusterStatus = self.__cluster.check_cluster(clusterInfo)
if clusterStatus == 12:
self.__log.info(clusterDir)
- keys = clusterInfo.keys()
- keys.sort()
- for key in keys:
- if key != 'env':
- self.__log.info("%s\t%s" % (key, clusterInfo[key]))
-
- if self.__cfg['hod']['debug'] == 4:
- for var in clusterInfo['env'].keys():
- self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+ self.__print_cluster_info(clusterInfo)
elif clusterStatus == 10:
self.__log.critical("%s cluster is dead" % clusterDir)
elif clusterStatus == 13:
self.__log.warn("%s cluster hdfs is dead" % clusterDir)
elif clusterStatus == 14:
self.__log.warn("%s cluster mapred is dead" % clusterDir)
-
+
if clusterStatus != 12:
if clusterStatus == 15:
self.__log.critical("Cluster %s not allocated." % clusterDir)
+ else:
+ self.__print_cluster_info(clusterInfo)
self.__opCode = clusterStatus
else:
@@ -321,7 +344,19 @@
self.__log.critical("%s operation requires one argument. " % operation
+ "A cluster path.")
self.__opCode = 3
-
+
+ def __print_cluster_info(self, clusterInfo):
+ keys = clusterInfo.keys()
+ keys.sort()
+ for key in keys:
+ if key != 'env':
+ self.__log.info("%s\t%s" % (key, clusterInfo[key]))
+
+ if self.__cfg['hod']['debug'] == 4:
+ for var in clusterInfo['env'].keys():
+ self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+
+
def _op_help(self, args):
print "hod operations:\n"
print " allocate <directory> <nodes> - Allocates a cluster of n nodes using the specified cluster"
@@ -342,6 +377,10 @@
opList = self.__check_operation(operation)
if self.__opCode == 0:
getattr(self, "_op_%s" % opList[0])(opList)
+ except HodInterruptException, h:
+ self.__log.critical("op: %s failed because of an process interrupt." \
+ % operation)
+ self.__opCode = HOD_INTERRUPTED_CODE
except:
self.__log.critical("op: %s failed: %s" % (operation,
get_exception_error_string()))
@@ -356,16 +395,41 @@
def script(self):
script = self.__cfg['hod']['script']
nodes = self.__cfg['hod']['min-nodes']
+ isExecutable = os.access(script, os.X_OK)
+ if not isExecutable:
+ self.__log.critical('Script %s is not an executable.' % script)
+ return 1
+
clusterDir = "/tmp/%s.%s" % (self.__cfg['hod']['userid'],
random.randint(0, 20000))
os.mkdir(clusterDir)
+ ret = 0
try:
self._op_allocate(('allocate', clusterDir, str(nodes)))
- scriptRunner = hadoopScript(clusterDir,
+ if self.__opCode == 0:
+ if self.__cfg['hod'].has_key('script-wait-time'):
+ time.sleep(self.__cfg['hod']['script-wait-time'])
+ self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time'])
+ if hodInterrupt.isSet():
+ self.__log.debug('Interrupt set - not executing script')
+ else:
+ scriptRunner = hadoopScript(clusterDir,
self.__cfg['hod']['original-dir'])
- self.__opCode = scriptRunner.run(script)
- self._op_deallocate(('deallocate', clusterDir))
+ self.__opCode = scriptRunner.run(script)
+ ret = self.__opCode
+ self.__log.debug("Exit code from running the script: %d" % self.__opCode)
+ else:
+ self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode)
+
+ if hodInterrupt.isSet():
+ # Got interrupt while executing script. Unsetting it for deallocating
+ hodInterrupt.setFlag(False)
+ if self._is_cluster_allocated(clusterDir):
+ self._op_deallocate(('deallocate', clusterDir))
shutil.rmtree(clusterDir, True)
+ except HodInterruptException, h:
+ self.__log.critical("Script failed because of an process interrupt.")
+ self.__opCode = HOD_INTERRUPTED_CODE
except:
self.__log.critical("script: %s failed: %s" % (script,
get_exception_error_string()))
@@ -373,4 +437,8 @@
self.__cleanup()
+ # We want to give importance to a failed script's exit code.
+ if ret != 0:
+ self.__opCode = ret
+
return self.__opCode
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py Mon Jan 28 07:58:08 2008
@@ -108,6 +108,10 @@
"""Delete a job, given it's id"""
raise NotImplementedError
+ def updateWorkerInfo(self, workerInfoMap):
+ """Update information about the workers started by this NodePool."""
+ raise NotImplementedError
+
def getNextNodeSetId(self):
id = self.nextNodeSetId
self.nextNodeSetId += 1