You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ni...@apache.org on 2008/01/28 16:58:15 UTC

svn commit: r615919 [1/2] - in /hadoop/core/trunk: ./ src/contrib/hod/ src/contrib/hod/bin/ src/contrib/hod/conf/ src/contrib/hod/hodlib/Common/ src/contrib/hod/hodlib/GridServices/ src/contrib/hod/hodlib/Hod/ src/contrib/hod/hodlib/HodRing/ src/contri...

Author: nigel
Date: Mon Jan 28 07:58:08 2008
New Revision: 615919

URL: http://svn.apache.org/viewvc?rev=615919&view=rev
Log:
HADOOP-2720. Jumbo bug fix patch to HOD.  Final sync of Apache SVN with internal Yahoo SVN. Contributed by Hemanth Yamijala.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/hod/bin/VERSION
    hadoop/core/trunk/src/contrib/hod/bin/hod
    hadoop/core/trunk/src/contrib/hod/bin/hodring
    hadoop/core/trunk/src/contrib/hod/bin/ringmaster
    hadoop/core/trunk/src/contrib/hod/conf/hodrc
    hadoop/core/trunk/src/contrib/hod/getting_started.txt
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py
    hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py
    hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
    hadoop/core/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
    hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py
    hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
    hadoop/core/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
    hadoop/core/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hod.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jan 28 07:58:08 2008
@@ -591,6 +591,9 @@
     HADOOP-2576. Namenode performance degradation over time triggered by
     large heartbeat interval. (Raghu Angadi)
 
+    HADOOP-2720. Jumbo bug fix patch to HOD.  Final sync of Apache SVN with
+    internal Yahoo SVN.  (Hemanth Yamijala via nigel)
+
 Release 0.15.3 - 2008-01-18
 
   BUG FIXES

Modified: hadoop/core/trunk/src/contrib/hod/bin/VERSION
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/VERSION?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/VERSION (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/VERSION Mon Jan 28 07:58:08 2008
@@ -1,16 +1 @@
-#Licensed to the Apache Software Foundation (ASF) under one
-#or more contributor license agreements.  See the NOTICE file
-#distributed with this work for additional information
-#regarding copyright ownership.  The ASF licenses this file
-#to you under the Apache License, Version 2.0 (the
-#"License"); you may not use this file except in compliance
-#with the License.  You may obtain a copy of the License at
-
-#     http://www.apache.org/licenses/LICENSE-2.0
-
-#Unless required by applicable law or agreed to in writing, software
-#distributed under the License is distributed on an "AS IS" BASIS,
-#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#See the License for the specific language governing permissions and
-#limitations under the License.
-DEVELOPMENT
+0.4.0

Modified: hadoop/core/trunk/src/contrib/hod/bin/hod
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hod?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/hod (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/hod Mon Jan 28 07:58:08 2008
@@ -45,7 +45,9 @@
 from hodlib.Hod.hod import hodRunner
 from hodlib.Common.setup import *
 from hodlib.Common.descGenerator import *
-from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings, get_exception_error_string
+from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings,\
+    get_exception_error_string, hodInterrupt, \
+    HOD_INTERRUPTED_MESG, HOD_INTERRUPTED_CODE
 from hodlib.Common.tcp import tcpError, tcpSocket
 
 filter_warnings()
@@ -91,7 +93,8 @@
               False, True, False, True, 's'),
 
              ('min-nodes', 'pos_int', 
-              'Minimum number of nodes to allocate at startup.',
+              'Minimum number of nodes to allocate at startup. ' + \
+              'Used with hod.script option',
               True, None, False, True, 'm'),
 
              ('script', 'file', 'Hadoop script to execute.',
@@ -124,10 +127,25 @@
               False, None, True, True),
 
              ('client-params', 'keyval', 'Hadoop client xml key/value list',
-              False, None, False, True, 'C'), 
+              True, None, False, True, 'C'), 
 
              ('hadoop-ui-log-dir', 'directory', 'Directory to store Web UI Logs of Hadoop',
-              False, None, False, True)),
+              True, None, False, True),
+
+             ('temp-dir', 'directory', 'HOD temporary directories.',
+              False, None, True, False),
+
+             ('update-worker-info', 'bool', 'Specifies whether to update Worker Info after allocation',
+              False, False, False, True),
+
+             ('title', 'string', 'Title for the current HOD allocation.',
+               True, "HOD", False, True, 'N'),
+
+             ('walltime', 'pos_int', 'Walltime in seconds for the current HOD allocation',
+              True, None, False, True),
+
+             ('script-wait-time', 'pos_int', 'Specifies the time to wait before running the script. Used with the hod.script option.',
+              True, 10, False, True, 'W')),
 
             'resource_manager' : (
              ('id', 'string', 'Batch scheduler ID: torque|condor.',
@@ -137,7 +155,7 @@
               False, None, False, True),
               
              ('pbs-account', 'string', 'User Account jobs are submitted under.',
-              True, pwd.getpwuid(os.getuid())[0], False, False, 'A'),
+              True, None, False, False, 'A'),
               
              ('queue', 'string', 'Queue of the batch scheduler to query.',
               True, 'batch', False, True, 'Q'),
@@ -215,7 +233,7 @@
               False, None, False, False),
 
              ('server-params', 'keyval', 'Hadoop xml key/value list',
-              False, None, False, True, 'M'),
+              True, None, False, True, 'M'),
                
              ('envs', 'keyval', 'environment to run this package in',
               False, None, False, False),
@@ -344,140 +362,151 @@
     return config['hod'].has_key('script')
 
 if __name__ == '__main__':  
-  confDef = definition()
-  confDef.add_defs(defList, defOrder)
-  hodOptions = options(confDef, "./%s -c <CONFIG_FILE> [OPTIONS]" % myName,
-                       VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG)
-
-  # hodConfig is a dict like object, hodConfig[section][name]
   try:
-    hodConfig = config(hodOptions['config'], configDef=confDef, 
-                     originalDir=hodOptions['hod']['original-dir'],
-                     options=hodOptions) 
-  except IOError, e:
-    print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
-    sys.exit(1)
-
-  status = True
-  statusMsgs = []
-
-  (status,statusMsgs) = hodConfig.verify()
-  if not status:
-    print >>sys.stderr,"error: bin/hod failed to start."
-    for msg in statusMsgs:
-      print >>sys.stderr,"%s" % (msg)
-    sys.exit(1)
-
-  ## TODO : should move the dependency verification to hodConfig.verify
-  if hodConfig['hod'].has_key('script') \
-    and not hodConfig['hod'].has_key('min-nodes'):
-    printErrors(hodConfig.var_error('hod', 'min-nodes',
-        "hod.min-nodes must be specified when using hod.script option."))
-    sys.exit(1)
-
-  if hodConfig['hod'].has_key('min-nodes'):
-    if hodConfig['hod']['min-nodes'] < 3:
-      printErrors(hodConfig.var_error('hod', 'min-nodes',
-        "hod.min-nodes must be >= 3 nodes: %s." % 
-        hodConfig['hod']['min-nodes']))
-      sys.exit(1)
+    confDef = definition()
+    confDef.add_defs(defList, defOrder)
+    hodOptions = options(confDef, "./%s -c <CONFIG_FILE> [OPTIONS]" % myName,
+                         VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG)
   
-  if hodConfig['hod'].has_key('operation') and \
-    hodConfig['hod'].has_key('script'):
-    print "Script execution and hod operations are mutually exclusive."
-    hodOptions.print_help(sys.stderr)
-    sys.exit(1)
-  
-  if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
-    print "HOD requires at least a script or operation be specified."
-    hodOptions.print_help(sys.stderr)
-    sys.exit(1)    
-  
-  if hodConfig['gridservice-hdfs']['external']:
-    hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'], 
-                             hodConfig['gridservice-hdfs']['fs_port'])
-
-    hdfsSocket = tcpSocket(hdfsAddress)
-      
+    # hodConfig is a dict like object, hodConfig[section][name]
     try:
-      hdfsSocket.open()
-      hdfsSocket.close()
-    except tcpError:
-      printErrors(hodConfig.var_error('hod', 'gridservice-hdfs', 
-        "Failed to open a connection to external hdfs address: %s." % 
-        hdfsAddress))
+      hodConfig = config(hodOptions['config'], configDef=confDef, 
+                       originalDir=hodOptions['hod']['original-dir'],
+                       options=hodOptions) 
+    except IOError, e:
+      print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
       sys.exit(1)
-  else:
-    hodConfig['gridservice-hdfs']['host'] = 'localhost'
-
-  if hodConfig['gridservice-mapred']['external']:
-    mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'], 
-                               hodConfig['gridservice-mapred']['tracker_port'])
-
-    mapredSocket = tcpSocket(mapredAddress)
-      
-    try:
-      mapredSocket.open()
-      mapredSocket.close()
-    except tcpError:
-      printErrors(hodConfig.var_error('hod', 'gridservice-mapred', 
-        "Failed to open a connection to external mapred address: %s." % 
-        mapredAddress))
+  
+    status = True
+    statusMsgs = []
+  
+    (status,statusMsgs) = hodConfig.verify()
+    if not status:
+      print >>sys.stderr,"error: bin/hod failed to start."
+      for msg in statusMsgs:
+        print >>sys.stderr,"%s" % (msg)
       sys.exit(1)
-  else:
-    hodConfig['gridservice-mapred']['host'] = 'localhost'
-
-  if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
-    not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
-    op_requires_pkgs(hodConfig):
-    printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs', 
-      "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
-      + "is not defined."))
-    sys.exit(1)
-
-  if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
-    not hodConfig['gridservice-mapred'].has_key('pkgs') and \
-    op_requires_pkgs(hodConfig):
-    printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs', 
-      "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
-      + "is not defined."))
-    sys.exit(1)
-
-  if hodConfig['hodring'].has_key('log-destination-uri'):
-    if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
-      pass
-    elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
-      hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
-      hostPort = hostPort[0]
-      socket = tcpSocket(hostPort)
+  
+    ## TODO : should move the dependency verification to hodConfig.verify
+    if hodConfig['hod'].has_key('script') \
+      and not hodConfig['hod'].has_key('min-nodes'):
+      printErrors(hodConfig.var_error('hod', 'min-nodes',
+          "hod.min-nodes must be specified when using hod.script option."))
+      sys.exit(1)
+  
+    if hodConfig['hod'].has_key('min-nodes'):
+      if hodConfig['hod']['min-nodes'] < 3:
+        printErrors(hodConfig.var_error('hod', 'min-nodes',
+          "hod.min-nodes must be >= 3 nodes: %s." % 
+          hodConfig['hod']['min-nodes']))
+        sys.exit(1)
+    
+    if hodConfig['hod'].has_key('operation') and \
+      hodConfig['hod'].has_key('script'):
+      print "Script execution and hod operations are mutually exclusive."
+      hodOptions.print_help(sys.stderr)
+      sys.exit(1)
+    
+    if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
+      print "HOD requires at least a script or operation be specified."
+      hodOptions.print_help(sys.stderr)
+      sys.exit(1)    
+    
+    if hodConfig['gridservice-hdfs']['external']:
+      hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'], 
+                               hodConfig['gridservice-hdfs']['fs_port'])
+  
+      hdfsSocket = tcpSocket(hdfsAddress)
+        
       try:
-        socket.open()
-        socket.close()
-      except:
-        printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
-        "Unable to contact host/port specified in log destination uri: %s" % 
-        hodConfig['hodring']['log-destination-uri']))
+        hdfsSocket.open()
+        hdfsSocket.close()
+      except tcpError:
+        printErrors(hodConfig.var_error('hod', 'gridservice-hdfs', 
+          "Failed to open a connection to external hdfs address: %s." % 
+          hdfsAddress))
+        sys.exit(1)
+    else:
+      hodConfig['gridservice-hdfs']['host'] = 'localhost'
+  
+    if hodConfig['gridservice-mapred']['external']:
+      mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'], 
+                                 hodConfig['gridservice-mapred']['tracker_port'])
+  
+      mapredSocket = tcpSocket(mapredAddress)
+        
+      try:
+        mapredSocket.open()
+        mapredSocket.close()
+      except tcpError:
+        printErrors(hodConfig.var_error('hod', 'gridservice-mapred', 
+          "Failed to open a connection to external mapred address: %s." % 
+          mapredAddress))
         sys.exit(1)
     else:
-      printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
-        "The log destiniation uri must be of type local:// or hdfs://."))
+      hodConfig['gridservice-mapred']['host'] = 'localhost'
+  
+    if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
+      not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
+      op_requires_pkgs(hodConfig):
+      printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs', 
+        "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
+        + "is not defined."))
       sys.exit(1)
-  ## TODO : end of should move the dependency verification to hodConfig.verif
-    
-  hodConfig['hod']['base-dir'] = rootDirectory
-  hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
-
-  dGen = DescGenerator(hodConfig)
-  hodConfig = dGen.initializeDesc()
   
-  os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
+    if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
+      not hodConfig['gridservice-mapred'].has_key('pkgs') and \
+      op_requires_pkgs(hodConfig):
+      printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs', 
+        "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
+        + "is not defined."))
+      sys.exit(1)
+  
+    if hodConfig['hodring'].has_key('log-destination-uri'):
+      if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
+        pass
+      elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
+        hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
+        hostPort = hostPort[0]
+        socket = tcpSocket(hostPort)
+        try:
+          socket.open()
+          socket.close()
+        except:
+          printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
+          "Unable to contact host/port specified in log destination uri: %s" % 
+          hodConfig['hodring']['log-destination-uri']))
+          sys.exit(1)
+      else:
+        printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
+          "The log destiniation uri must be of type local:// or hdfs://."))
+        sys.exit(1)
+  
+    ## TODO : end of should move the dependency verification to hodConfig.verif
+      
+    hodConfig['hod']['base-dir'] = rootDirectory
+    hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
+  
+    dGen = DescGenerator(hodConfig)
+    hodConfig = dGen.initializeDesc()
+    
+    os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
+    
+    if hodConfig['hod']['debug'] == 4:
+      print ""
+      print "Using Python: %s" % sys.version
+      print ""
+   
+    hod = hodRunner(hodConfig)
   
-  if hodConfig['hod']['debug'] == 4:
-    print ""
-    print "Using Python: %s" % sys.version
-    print ""
+    # Initiate signal handling
+    hodInterrupt.set_log(hod.get_logger())
+    hodInterrupt.init_signals()
+    # Interrupts set up. Now on we handle signals only when we wish to.
+  except KeyboardInterrupt:
+    print HOD_INTERRUPTED_MESG
+    sys.exit(HOD_INTERRUPTED_CODE)
   
-  hod = hodRunner(hodConfig)
   if hodConfig['hod'].has_key('script'):
     sys.exit(hod.script())
   else:  

Modified: hadoop/core/trunk/src/contrib/hod/bin/hodring
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hodring?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/hodring (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/hodring Mon Jan 28 07:58:08 2008
@@ -188,6 +188,102 @@
     service = HodRing(hodRingOptions)
     service.start()
     service.wait()
+   
+    if service.log:
+      log = service.log
+    else: 
+      log = getLogger(hodRingOptions)
+
+    list = []
+    
+    runningHadoops = service.getRunningValues()
+      
+    for cmd in runningHadoops:
+      log.debug("addding %s to cleanup list..." % cmd)
+      cmd.addCleanup(list)
+    
+    list.append(service.getTempDir())
+    log.debug(list)
+       
+    # archive_logs now
+    cmdString = os.path.join(rootDirectory, "bin", "hodcleanup") # same python
+
+    if (len(runningHadoops) == 0):
+      log.info("len(runningHadoops) == 0, No running cluster?")
+      log.info("Skipping __copy_archive_to_dfs")
+      hadoopString = ""
+    else: hadoopString=runningHadoops[0].path
+
+    #construct the arguments
+    if hodRingOptions['hodring'].has_key('log-destination-uri'):
+      cmdString = cmdString + " --log-destination-uri " \
+                    + hodRingOptions['hodring']['log-destination-uri']
+
+    hadoopLogDirs = service.getHadoopLogDirs()
+    if hadoopLogDirs:
+      cmdString = cmdString \
+                    + " --hadoop-log-dirs " \
+                    + ",".join(hadoopLogDirs)
+
+    cmdString = cmdString \
+                  + " --temp-dir " \
+                  + service._cfg['temp-dir'] \
+                  + " --hadoop-command-string " \
+                  + hadoopString \
+                  + " --user-id " \
+                  + service._cfg['userid'] \
+                  + " --service-id " \
+                  + service._cfg['service-id'] \
+                  + " --hodring-debug " \
+                  + str(hodRingOptions['hodring']['debug']) \
+                  + " --hodring-log-dir " \
+                  + hodRingOptions['hodring']['log-dir'] \
+                  + " --hodring-cleanup-list " \
+                  + ",".join(list)
+
+    if hodRingOptions['hodring'].has_key('syslog-address'):
+      cmdString = cmdString + " --hodring-syslog-address " \
+                + hodRingOptions['hodring']['syslog-address']
+    if service._cfg.has_key('pkgs'):
+      cmdString = cmdString + " --pkgs " + service._cfg['pkgs']
+
+    log.info("cleanup commandstring : ")
+    log.info(cmdString)
+
+    # clean up
+    cmd = ['/bin/sh', '-c', cmdString]
+
+    mswindows = (sys.platform == "win32")
+    originalcwd = os.getcwd()
+
+    if not mswindows:
+      try: 
+        pid = os.fork() 
+        if pid > 0:
+          # exit first parent
+          log.info("child(pid: %s) is now doing cleanup" % pid)
+          sys.exit(0) 
+      except OSError, e: 
+        log.error("fork failed: %d (%s)" % (e.errno, e.strerror)) 
+        sys.exit(1)
+
+      # decouple from parent environment
+      os.chdir("/") 
+      os.setsid() 
+      os.umask(0) 
+ 
+    MAXFD = 128 # more than enough file descriptors to close. Just in case.
+    for i in xrange(0, MAXFD):
+      try:
+        os.close(i)
+      except OSError:
+        pass
+  
+    try:
+      os.execvp(cmd[0], cmd)
+    finally:
+      log.critical("exec failed")
+      os._exit(1)
 
   except Exception:
     if service:
@@ -195,4 +291,4 @@
         log = service.log
     else:
       log = getLogger(hodRingOptions)
-    log.error("bin/hodring failed to start. %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))
+    log.error("Error in bin/hodring %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))

Modified: hadoop/core/trunk/src/contrib/hod/bin/ringmaster
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/ringmaster?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/ringmaster (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/ringmaster Mon Jan 28 07:58:08 2008
@@ -122,7 +122,7 @@
               False, None, False, True),    
 
              ('pbs-account', 'string', 'User Account jobs are submitted under.',
-              False, None, True, False),
+              False, None, False, False),
 
              ('queue', 'string', 'Queue of the batch scheduler to query.',
               False, None, False, False),
@@ -317,14 +317,19 @@
   confDef.add_defs(defList, defOrder)
   ringMasterOptions = options(confDef, "./%s [OPTIONS]" % myName, VERSION)
   ensureLogDir(ringMasterOptions['ringmaster']['log-dir'])
-  log = getLogger(ringMasterOptions['ringmaster'])
+  log = None
 
   try:
+    log = getLogger(ringMasterOptions['ringmaster'])
     (status, statusMsgs) = ringMasterOptions.verify()
     if not status:
       raise Exception("%s" % statusMsgs)
+    ringMasterOptions.replace_escape_seqs()
     ringMasterOptions['ringmaster']['base-dir'] = rootDirectory 
-    main(ringMasterOptions,log)
-    sys.exit(0)
+    ret = main(ringMasterOptions,log)
+    sys.exit(ret)
   except Exception, e:
-    log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+    if log:
+      log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+    # Ringmaster failing to start is a ringmaster error. Exit with the appropriate exit code.
+    sys.exit(6)

Modified: hadoop/core/trunk/src/contrib/hod/conf/hodrc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/conf/hodrc?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/conf/hodrc (original)
+++ hadoop/core/trunk/src/contrib/hod/conf/hodrc Mon Jan 28 07:58:08 2008
@@ -6,6 +6,7 @@
 xrs-port-range                  = 32768-65536
 debug                           = 3
 allocate-wait-time              = 3600
+temp-dir                        = /tmp/hod
 
 [ringmaster]
 register                        = True

Modified: hadoop/core/trunk/src/contrib/hod/getting_started.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/getting_started.txt?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/getting_started.txt (original)
+++ hadoop/core/trunk/src/contrib/hod/getting_started.txt Mon Jan 28 07:58:08 2008
@@ -26,7 +26,7 @@
   cluster. However, it can also use a pre-installed version of Hadoop,
   if it is available on all nodes in the cluster.
   (http://lucene.apache.org/hadoop)
-  HOD currently supports only Hadoop 0.16, which is under development.
+  HOD currently supports Hadoop 0.15 and above.
 
 NOTE: HOD configuration requires the location of installs of these 
 components to be the same on all nodes in the cluster. It will also 

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/desc.py Mon Jan 28 07:58:08 2008
@@ -125,38 +125,9 @@
     self.dict.setdefault('pkgs', '')
     self.dict.setdefault('final-attrs', {})
     self._checkRequired()
-    self.__dict_update()
-
-  def __dict_update(self):
-    getattr(self, "_%s" % self.dict['id'])()
-
-  def _mapred(self):
-    if self.isExternal():
-      self.dict['final-attrs']['mapred.job.tracker'] = "%s:%s" % (self.dict['host'], 
-        self.dict['tracker_port'])
-      
-      # self.dict['final-attrs']['mapred.job.tracker.info.port'] = \
-      #   str(self.dict['info_port'])
-      # After Hadoop-2185
-      self.dict['final-attrs']['mapred.job.tracker.http.bindAddress'] = \
-        "%s:%s" %(self.dict['host'], self.dict['info_port'])
-      
     if self.dict.has_key('hadoop-tar-ball'):
       self.dict['tar'] = self.dict['hadoop-tar-ball']  
-  
-  def _hdfs(self):
-    if self.isExternal():
-      self.dict['final-attrs']['fs.default.name'] = "%s:%s" % (self.dict['host'], 
-        self.dict['fs_port'])
-      
-      # self.dict['final-attrs']['dfs.info.port'] = str(self.dict['info_port'])
-      # After Hadoop-2185
-      self.dict['final-attrs']['dfs.http.bindAddress'] = "%s:%s" % \
-        (self.dict['host'], self.dict['info_port'])
-      
-    if self.dict.has_key('hadoop-tar-ball'):
-      self.dict['tar'] = self.dict['hadoop-tar-ball']
-  
+
   def _checkRequired(self):
 
     if not 'id' in self.dict:

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/hodsvc.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
 #limitations under the License.
 # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/13/2007
 #------------------------------------------------------------------------------
 import os, time, shutil, xmlrpclib, socket, pprint
 
@@ -51,7 +50,7 @@
     
     self._init_logging()
         
-    self._init_signals()
+    if name != 'serviceRegistry': self._init_signals()
     self._init_xrc_server()
     
   def __set_logging_level(self, level):

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/setup.py Mon Jan 28 07:58:08 2008
@@ -16,7 +16,6 @@
 # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
 # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
 #------------------------------------------------------------------------------
 
 """'setup' provides for reading and verifing configuration files based on
@@ -26,7 +25,7 @@
 
 from ConfigParser import SafeConfigParser
 from optparse import OptionParser, IndentedHelpFormatter, OptionGroup
-from util import get_perms
+from util import get_perms, replace_escapes
 from types import typeValidator, is_valid_type, typeToString
 
 reEmailAddress = re.compile("^.*@.*$")
@@ -37,6 +36,8 @@
 reCommentNewline = re.compile("\W$")
 reKeyVal = r"(?<!\\)="
 reKeyVal = re.compile(reKeyVal)
+reKeyValList = r"(?<!\\),"
+reKeyValList = re.compile(reKeyValList)
 
 errorPrefix = 'error'
 requiredPerms = '0660'
@@ -485,7 +486,7 @@
                            # Append to the current list of values in self._dict
                            if not self._dict[section].has_key(option):
                              self._dict[section][option] = ""
-                           dictOpts = self._dict[section][option].split(",")
+                           dictOpts = reKeyValList.split(self._dict[section][option])
                            dictOptsKeyVals = {}
                            for opt in dictOpts:
                               if opt != '':
@@ -495,13 +496,16 @@
                                   # we only consider the first '=' for splitting
                                   # we do this to support passing params like
                                   # mapred.child.java.opts=-Djava.library.path=some_dir
+                                  # Even in case of an invalid error like unescaped '=',
+                                  # we don't want to fail here itself. We leave such errors 
+                                  # to be caught during validation which happens after this
                                   dictOptsKeyVals[key] = val
                                 else: 
                                   # this means an invalid option. Leaving it
                                   #for config.verify to catch
                                   dictOptsKeyVals[opt] = None
                                 
-                           cmdLineOpts = self._options[section][option].split(",")
+                           cmdLineOpts = reKeyValList.split(self._options[section][option])
 
                            for opt in cmdLineOpts:
                               if reKeyVal.search(opt):
@@ -573,6 +577,10 @@
             raise Exception( error)
             sys.exit(1)
 
+    def replace_escape_seqs(self):
+      """ replace any escaped characters """
+      replace_escapes(self)
+
 class formatter(IndentedHelpFormatter):
     def format_option_strings(self, option):
         """Return a comma-separated list of option strings & metavariables."""
@@ -667,11 +675,21 @@
             self.config = self.__parsedOptions.config
             if not self.config:
                 self.error("configuration file must be specified")
+            if not os.path.isabs(self.config):
+                # A relative path. Append the original directory which would be the
+                # current directory at the time of launch
+                try:  
+                    origDir = getattr(self.__parsedOptions, 'hod.original-dir')
+                    if origDir is not None:
+                        self.config = os.path.join(origDir, self.config)
+                        self.__parsedOptions.config = self.config
+                except AttributeError, e:
+                    self.error("hod.original-dir is not defined.\
+                                   Cannot get current directory")
             if not os.path.exists(self.config):
                 if self.__defaultLoc and not re.search("/", self.config):
                     self.__parsedOptions.config = os.path.join(
                         self.__defaultLoc, self.config)
-    
         self.__build_dict()   
 
     
@@ -910,3 +928,6 @@
                         
     def verify(self):
         return baseConfig.verify(self)
+
+    def replace_escape_seqs(self):
+      replace_escapes(self)

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/socketServers.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
 #limitations under the License.
 # Various socket server and helper classes.
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 03/07/2007
 #
 import os, sys, socket, threading, pprint, re, xmlrpclib, time
   

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/tcp.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
 #limitations under the License.
 # $Id:tcp.py 6172 2007-05-22 20:26:54Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
 #------------------------------------------------------------------------------
 
 """ TCP related classes. """

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/threads.py Mon Jan 28 07:58:08 2008
@@ -132,13 +132,16 @@
                 output = cmd.fromchild.readline()
 
         elif self.__wait == False:
-            for output in cmd.fromchild.readlines():
+            output = cmd.fromchild.readline()
+            while output != '':
                 while not self.running.isSet():
                     if self.stopFlag.isSet():
                         break
                     time.sleep(1)
-                
                 print output,
+                if self.stopFlag.isSet():
+                    break
+                output = cmd.fromchild.readline()
         else:
             self.stdout = cmd.fromchild
 

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/types.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,6 @@
 #limitations under the License.
 # $Id:types.py 6172 2007-05-22 20:26:54Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
 #------------------------------------------------------------------------------
 
 """ Higher level data types and type related classes.
@@ -325,12 +324,17 @@
         return value
 
     def __tostring_keyval(self, value):
-        string = ''
+        string = '"' # to protect from shell escapes
         for key in value:
-            for item in value[key]:
-                string = "%s%s=%s," % (string, key, item)
-                
-        return string[:-1]  
+          # for item in value[key]:
+          #      string = "%s%s=%s," % (string, key, item)
+          # Quotes still cannot protect Double-slashes.
+          # Dealing with them separately
+          val = re.sub(r"\\\\",r"\\\\\\\\",value[key])
+
+          string = "%s%s=%s," % (string, key, val)
+
+        return string[:-1] + '"'
 
     def __tostring_list(self, value):
         string = ''
@@ -678,13 +682,11 @@
         list = self.__norm_list(value)
         keyValue = {}
         for item in list:
-            # we only consider the first '=' for splitting
-            # we do this to support passing params like 
-            # mapred.child.java.opts=-Djava.library.path=some_dir
-            (key, value) = reKeyVal.split(item,1)
-            if not keyValue.has_key(key):
-                keyValue[key] = []
-            keyValue[key].append(value)
+            (key, value) = reKeyVal.split(item)
+            #if not keyValue.has_key(key):
+            #    keyValue[key] = []
+            #keyValue[key].append(value)
+            keyValue[key] = value
         return keyValue     
 
     def __verify_list(self, type, value):

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/util.py Mon Jan 28 07:58:08 2008
@@ -13,12 +13,17 @@
 #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #See the License for the specific language governing permissions and
 #limitations under the License.
-import sys, os, traceback, stat, socket, re, warnings
+import sys, os, traceback, stat, socket, re, warnings, signal
 
 from hodlib.Common.tcp import tcpSocket, tcpError 
 from hodlib.Common.threads import simpleCommand
 
 setUGV   = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
+reEscapeSeq = r"\\(.)?"
+reEscapeSeq = re.compile(reEscapeSeq)
+
+HOD_INTERRUPTED_CODE = 127
+HOD_INTERRUPTED_MESG = "Hod Interrupted. Cleaning up and exitting"
 
 class AlarmException(Exception):
     def __init__(self, msg=''):
@@ -170,3 +175,117 @@
   for item in list:
     arg = "%s%s " % (arg, item)
   return arg[:-1]
+
+def replace_escapes(object):
+  """ replace any escaped character. e.g \, with , \= with = and so on """
+  # here object is either a config object or a options object
+  for section in object._mySections:
+    for option in object._configDef[section].keys():
+      if object[section].has_key(option):
+        if object._configDef[section][option]['type'] == 'keyval':
+          keyValDict = object[section][option]
+          object[section][option] = {}
+          for (key,value) in keyValDict.iteritems():
+            match = reEscapeSeq.search(value)
+            if match:
+              value = reEscapeSeq.sub(r"\1", value)
+            object[section][option][key] = value
+
+def hadoopVersion(hadoopDir, java_home, log):
+  # Determine the version of hadoop being used by executing the 
+  # hadoop version command. Code earlier in idleTracker.py
+  hadoopVersion = { 'major' : None, 'minor' : None }
+  hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
+  cmd = "%s version" % hadoopPath
+  log.debug('Executing command %s to find hadoop version' % cmd)
+  env = os.environ
+  env['JAVA_HOME'] = java_home
+  hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
+  hadoopVerCmd.start()
+  hadoopVerCmd.wait()
+  hadoopVerCmd.join()
+  if hadoopVerCmd.exit_code() == 0:
+    verLine = hadoopVerCmd.output()[0]
+    log.debug('Version from hadoop command: %s' % verLine)
+    hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
+    verMatch = hadoopVerRegExp.match(verLine)
+    if verMatch != None:
+      hadoopVersion['major'] = verMatch.group(1)
+      hadoopVersion['minor'] = verMatch.group(2)
+  return hadoopVersion
+
+
+def get_cluster_status(hdfsAddress, mapredAddress):
+  """Determine the status of the cluster based on socket availability
+     of HDFS and Map/Reduce."""
+  status = 0
+
+  mapredSocket = tcpSocket(mapredAddress)
+  try:
+    mapredSocket.open()
+    mapredSocket.close()
+  except tcpError:
+    status = 14
+
+  hdfsSocket = tcpSocket(hdfsAddress)
+  try:
+    hdfsSocket.open()
+    hdfsSocket.close()
+  except tcpError:
+    if status > 0:
+      status = 10
+    else:
+      status = 13
+
+  return status
+
+def parseEquals(list):
+  # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
+  # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and 
+  # HodRing/hodring.py. No need for specially treating escaped =. as in \=,
+  # since all keys are generated by hod and don't contain such anomalies
+  dict = {}
+  for elems in list:
+    splits = elems.split('=')
+    dict[splits[0]] = splits[1]
+  return dict
+
+class HodInterrupt:
+  def __init__(self):
+    self.HodInterruptFlag = False
+    self.log = None
+
+  def set_log(self, log):
+    self.log = log
+
+  def init_signals(self):
+
+    def sigStop(sigNum, handler):
+      sig_wrapper(sigNum, self.setFlag)
+
+    signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
+    signal.signal(signal.SIGQUIT, sigStop) # 3  : Quit program
+    signal.signal(signal.SIGINT, sigStop)  # 2 ^C : Interrupt program
+
+    def sig_wrapper(sigNum, handler, *args):
+      self.log.critical("Caught signal %s." % sigNum )
+
+      if args:
+          handler(args)
+      else:
+          handler()
+
+  def setFlag(self, val = True):
+    self.HodInterruptFlag = val
+
+  def isSet(self):
+    return self.HodInterruptFlag
+
+class HodInterruptException(Exception):
+  def __init__(self, value = ""):
+    self.value = value
+    
+  def __str__(self):
+    return repr(self.value)
+
+hodInterrupt = HodInterrupt()

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Common/xmlrpc.py Mon Jan 28 07:58:08 2008
@@ -14,6 +14,7 @@
 #See the License for the specific language governing permissions and
 #limitations under the License.
 import xmlrpclib, time, random, signal
+from hodlib.Common.util import hodInterrupt
 
 class hodXRClient(xmlrpclib.ServerProxy):
     def __init__(self, uri, transport=None, encoding=None, verbose=0,
@@ -42,6 +43,8 @@
                 break
             except Exception:
                 if self.__retryRequests:
+                  if hodInterrupt.isSet():
+                    raise HodInterruptException()
                   time.sleep(retryWaitTime)
                 else:
                   raise Exception("hodXRClientTimeout")

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/hdfs.py Mon Jan 28 07:58:08 2008
@@ -22,15 +22,16 @@
 from service import *
 from hodlib.Hod.nodePool import *
 from hodlib.Common.desc import CommandDesc
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, parseEquals
 
 class HdfsExternal(MasterSlave):
   """dummy proxy to external HDFS instance"""
 
-  def __init__(self, serviceDesc, workDirs):
+  def __init__(self, serviceDesc, workDirs, version):
     MasterSlave.__init__(self, serviceDesc, workDirs,None)
     self.launchedMaster = True
     self.masterInitialized = True
+    self.version = version
     
   def getMasterRequest(self):
     return None
@@ -49,21 +50,33 @@
     addr = attrs['fs.default.name']
     return [addr]
   
-  def setMasterParams(self, list):
-    raise NotImplementedError
+  def setMasterParams(self, dict):
+   self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
+     (dict['host'], dict['fs_port'])
+
+   if self.version < 16:
+    self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
+                                    str(self.serviceDesc.dict['info_port'])
+   else:
+     # After Hadoop-2185
+     self.serviceDesc.dict['final-attrs']['dfs.http.bindAddress'] = "%s:%s" % \
+       (dict['host'], dict['info_port'])
 
   def getInfoAddrs(self):
     attrs = self.serviceDesc.getfinalAttrs()
-    addr = attrs['fs.default.name']
-    k,v = addr.split( ":")
-    # infoaddr = k + ':' + attrs['dfs.info.port']
-    # After Hadoop-2185
-    infoaddr = attrs['dfs.http.bindAddress']
+    if self.version < 16:
+      addr = attrs['fs.default.name']
+      k,v = addr.split( ":")
+      infoaddr = k + ':' + attrs['dfs.info.port']
+    else:
+      # After Hadoop-2185
+      infoaddr = attrs['dfs.http.bindAddress']
     return [infoaddr]
 
 class Hdfs(MasterSlave):
 
-  def __init__(self, serviceDesc, nodePool, required_node, format=True, upgrade=False):
+  def __init__(self, serviceDesc, nodePool, required_node, version, \
+                                        format=True, upgrade=False):
     MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
     self.masterNode = None
     self.masterAddr = None
@@ -73,6 +86,7 @@
     self.format = format
     self.upgrade = upgrade
     self.workers = []
+    self.version = version
 
   def getMasterRequest(self):
     req = NodeRequest(1, [], False)
@@ -124,16 +138,14 @@
     self.masterAddr = dict['fs.default.name']
     k,v = self.masterAddr.split( ":")
     self.masterNode = k
-    # self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
-    # After Hadoop-2185
-    self.infoAddr = dict['dfs.http.bindAddress']
+    if self.version < 16:
+      self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
+    else:
+      # After Hadoop-2185
+      self.infoAddr = dict['dfs.http.bindAddress']
    
   def _parseEquals(self, list):
-    dict = {}
-    for elems in list:
-      splits = elems.split('=')
-      dict[splits[0]] = splits[1]
-    return dict
+    return parseEquals(list)
   
   def _getNameNodePort(self):
     sd = self.serviceDesc
@@ -152,16 +164,25 @@
   def _getNameNodeInfoPort(self):
     sd = self.serviceDesc
     attrs = sd.getfinalAttrs()
-    if 'dfs.http.bindAddress' not in attrs:
-      return ServiceUtil.getUniqPort()
+    if self.version < 16:
+      if 'dfs.info.bindAddress' not in attrs:
+        return ServiceUtil.getUniqPort()
+    else:
+      if 'dfs.http.bindAddress' not in attrs:
+        return ServiceUtil.getUniqPort()
 
-    # p = attrs['dfs.info.port'] 
-    p = attrs['dfs.http.bindAddress'].split(':')[1]
+    if self.version < 16:
+      p = attrs['dfs.info.port']
+    else:
+      p = attrs['dfs.http.bindAddress'].split(':')[1]
     try:
       return int(p)
     except:
       print get_exception_string()
-      raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
+      if self.version < 16:
+        raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
+      else:
+        raise ValueError, "Can't find port from attr dfs.http.bindAddress: %s" % (p)
 
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
     namedir = None
@@ -183,7 +204,7 @@
     attrs['dfs.name.dir'] = namedir
     attrs['dfs.data.dir'] = ','.join(datadir)
     # FIXME -- change dfs.client.buffer.dir
-    envs['HADOOP_ROOT_LOGGER'] = ["INFO,DRFA",]
+    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
 
 
   def _getNameNodeCommand(self, format=False, upgrade=False):
@@ -199,13 +220,14 @@
       attrs['fs.default.name'] = 'fillinhostport'
     #self.infoPort = port = self._getNameNodeInfoPort()
  
-    # if 'dfs.info.port' not in attrs:
-    #  attrs['dfs.info.port'] = 'fillinport'
-   
-    # Addressing Hadoop-2815, added the following. Earlier version don't
-    # care about this
-    if 'dfs.http.bindAddress' not in attrs:
-      attrs['dfs.http.bindAddress'] = 'fillinhostport'
+    if self.version < 16:
+     if 'dfs.info.port' not in attrs:
+      attrs['dfs.info.port'] = 'fillinport'
+    else:
+      # Addressing Hadoop-2815, added the following. Earlier versions don't
+      # care about this
+      if 'dfs.http.bindAddress' not in attrs:
+        attrs['dfs.http.bindAddress'] = 'fillinhostport'
 
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
 
@@ -277,11 +299,18 @@
 
     attrs['fs.default.name'] = nn
 
-    # Adding the following. Hadoop-2815
-    if 'dfs.datanode.bindAddress' not in attrs:
-      attrs['dfs.datanode.bindAddress'] = 'fillinhostport'
-    if 'dfs.datanode.http.bindAddress' not in attrs:
-      attrs['dfs.datanode.http.bindAddress'] = 'fillinhostport'
+    if self.version < 16:
+      if 'dfs.datanode.port' not in attrs:
+        attrs['dfs.datanode.port'] = 'fillinport'
+      if 'dfs.datanode.info.port' not in attrs:
+        attrs['dfs.datanode.info.port'] = 'fillinport'
+    else:
+      # Adding the following. Hadoop-2815
+      if 'dfs.datanode.bindAddress' not in attrs:
+        attrs['dfs.datanode.bindAddress'] = 'fillinhostport'
+      if 'dfs.datanode.http.bindAddress' not in attrs:
+        attrs['dfs.datanode.http.bindAddress'] = 'fillinhostport'
+    
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
 
     dict = { 'name' : 'datanode' }

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/GridServices/mapred.py Mon Jan 28 07:58:08 2008
@@ -22,15 +22,16 @@
 from service import *
 from hodlib.Hod.nodePool import *
 from hodlib.Common.desc import CommandDesc
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, parseEquals
 
 class MapReduceExternal(MasterSlave):
   """dummy proxy to external MapReduce instance"""
 
-  def __init__(self, serviceDesc, workDirs):
+  def __init__(self, serviceDesc, workDirs, version):
     MasterSlave.__init__(self, serviceDesc, workDirs,None)
     self.launchedMaster = True
     self.masterInitialized = True
+    self.version = version
     
   def getMasterRequest(self):
     return None
@@ -55,22 +56,33 @@
   def needsLess(self):
     return 0
 
-  def setMasterParams(self, list):
-    raise NotImplementedError
-  
+  def setMasterParams(self, dict):
+    self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'], 
+      dict['tracker_port'])
+    
+    if self.version < 16:
+      self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = \
+                                      str(self.serviceDesc.dict['info_port'])
+    else:
+      # After Hadoop-2185
+      self.serviceDesc['final-attrs']['mapred.job.tracker.http.bindAddress'] = \
+        "%s:%s" %(dict['host'], dict['info_port'])
+
   def getInfoAddrs(self):
     attrs = self.serviceDesc.getfinalAttrs()
-    addr = attrs['mapred.job.tracker']
-    k,v = addr.split( ":")
-    # infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
-    # After Hadoop-2185
-    # Note: earlier,we never respected mapred.job.tracker.http.bindAddress
-    infoaddr = attrs['mapred.job.tracker.http.bindAddress']
+    if self.version < 16:
+      addr = attrs['mapred.job.tracker']
+      k,v = addr.split( ":")
+      infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
+    else:
+      # After Hadoop-2185
+      # Note: earlier,we never respected mapred.job.tracker.http.bindAddress
+      infoaddr = attrs['mapred.job.tracker.http.bindAddress']
     return [infoaddr]
   
 class MapReduce(MasterSlave):
 
-  def __init__(self, serviceDesc, workDirs,required_node):
+  def __init__(self, serviceDesc, workDirs,required_node, version):
     MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
 
     self.masterNode = None
@@ -78,6 +90,7 @@
     self.infoAddr = None
     self.workers = []
     self.required_node = required_node
+    self.version = version
 
   def isLaunchable(self, serviceDict):
     hdfs = serviceDict['hdfs']
@@ -127,16 +140,14 @@
     self.masterAddr = dict['mapred.job.tracker']
     k,v = self.masterAddr.split(":")
     self.masterNode = k
-    # self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
-    # After Hadoop-2185
-    self.infoAddr = dict['mapred.job.tracker.http.bindAddress']
+    if self.version < 16:
+      self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
+    else:
+      # After Hadoop-2185
+      self.infoAddr = dict['mapred.job.tracker.http.bindAddress']
   
   def _parseEquals(self, list):
-    dict = {}
-    for elems in list:
-      splits = elems.split('=')
-      dict[splits[0]] = splits[1]
-    return dict
+    return parseEquals(list)
 
   def _getJobTrackerPort(self):
     sd = self.serviceDesc
@@ -152,21 +163,29 @@
       print get_exception_string()
       raise ValueError, "Can't find port from attr mapred.job.tracker: %s" % (v)
 
+  # UNUSED METHOD
   def _getJobTrackerInfoPort(self):
     sd = self.serviceDesc
     attrs = sd.getfinalAttrs()
-    # if not 'mapred.job.tracker.info.port' in attrs:
-    if 'mapred.job.tracker.http.bindAddress' not in attrs:
-      return ServiceUtil.getUniqPort()
-
-    # p = attrs['mapred.job.tracker.info.port']
-    p = attrs['mapred.job.tracker.http.bindAddress']
+    if self.version < 16:
+      if not 'mapred.job.tracker.info.port' in attrs:
+        return ServiceUtil.getUniqPort()
+    else:
+      if 'mapred.job.tracker.http.bindAddress' not in attrs:
+        return ServiceUtil.getUniqPort()
+
+    if self.version < 16:
+      p = attrs['mapred.job.tracker.info.port']
+    else:
+      p = attrs['mapred.job.tracker.http.bindAddress'].split(':')[1]
     try:
       return int(p)
     except:
       print get_exception_string()
-      # raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
-      raise ValueError, "Can't find port from attr mapred.job.tracker.http.bindAddress: %s" % (p)
+      if self.version < 16:
+        raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
+      else:
+        raise ValueError, "Can't find port from attr mapred.job.tracker.http.bindAddress: %s" % (p)
 
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
     local = []
@@ -193,7 +212,7 @@
     attrs['dfs.client.buffer.dir'] = ','.join(dfsclient)
 
 
-    envs['HADOOP_ROOT_LOGGER'] = ["INFO,DRFA",]
+    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
 
 
   def _getJobTrackerCommand(self, hdfs):
@@ -201,25 +220,28 @@
 
     parentDirs = self.workDirs
     workDirs = []
-    attrs = sd.getfinalAttrs()
-    envs = sd.getEnvs()
+    attrs = sd.getfinalAttrs().copy()
+    envs = sd.getEnvs().copy()
 
     #self.masterPort = port = self._getJobTrackerPort()
     if 'mapred.job.tracker' not in attrs:
       attrs['mapred.job.tracker'] = 'fillinhostport'
 
     #self.infoPort = port = self._getJobTrackerInfoPort()
-    # if 'mapred.job.tracker.info.port' not in attrs:
-    #   attrs['mapred.job.tracker.info.port'] = 'fillinport'
+    if self.version < 16:
+      if 'mapred.job.tracker.info.port' not in attrs:
+        attrs['mapred.job.tracker.info.port'] = 'fillinport'
+    else:
+      # Addressing Hadoop-2815,
+      if 'mapred.job.tracker.http.bindAddress' not in attrs:
+        attrs['mapred.job.tracker.http.bindAddress'] = 'fillinhostport'
 
     attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
-    # Addressing Hadoop-2815,
-    if 'mapred.job.tracker.http.bindAddress' not in attrs:
-      attrs['mapred.job.tracker.http.bindAddress'] = 'fillinhostport'
 
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt')
 
     dict = { 'name' : 'jobtracker' }
+    dict['version'] = self.version
     dict['program'] = os.path.join('bin', 'hadoop')
     dict['argv'] = ['jobtracker']
     dict['envs'] = envs
@@ -236,8 +258,8 @@
 
     parentDirs = self.workDirs
     workDirs = []
-    attrs = sd.getfinalAttrs()
-    envs = sd.getEnvs()
+    attrs = sd.getfinalAttrs().copy()
+    envs = sd.getEnvs().copy()
     jt = self.masterAddr
 
     if jt == None:
@@ -246,11 +268,17 @@
     attrs['mapred.job.tracker'] = jt
     attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
 
-    # Adding the following. Hadoop-2815
-    if 'mapred.task.tracker.report.bindAddress' not in attrs:
-      attrs['mapred.task.tracker.report.bindAddress'] = 'fillinhostport'
-    if 'mapred.task.tracker.http.bindAddress' not in attrs:
-      attrs['mapred.task.tracker.http.bindAddress'] = 'fillinhostport'
+    if self.version < 16:
+      if 'tasktracker.http.port' not in attrs:
+        attrs['tasktracker.http.port'] = 'fillinport'
+      # earlier to 16, tasktrackers always took ephemeral port 0 for
+      # tasktracker.report.bindAddress
+    else:
+      # Adding the following. Hadoop-2815
+      if 'mapred.task.tracker.report.bindAddress' not in attrs:
+        attrs['mapred.task.tracker.report.bindAddress'] = 'fillinhostport'
+      if 'mapred.task.tracker.http.bindAddress' not in attrs:
+        attrs['mapred.task.tracker.http.bindAddress'] = 'fillinhostport'
 
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
 

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py Mon Jan 28 07:58:08 2008
@@ -57,8 +57,8 @@
     
     return prop
 
-  def gen_site_conf(self, confDir, numNodes, hdfsAddr, mapredAddr=None,\
-             clientParams=None, serverParams=None,\
+  def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr,\
+             mapredAddr=None, clientParams=None, serverParams=None,\
              finalServerParams=None, clusterFactor=None):
     if not mapredAddr:
       mapredAddr = "dummy:8181"
@@ -69,51 +69,58 @@
       "This is an auto generated hadoop-site.xml, do not modify")
     topElement = doc.documentElement
     topElement.appendChild(comment)
-    prop = self.__create_xml_element(doc, 'mapred.job.tracker', 
-                                     mapredAddr, "description")
-    topElement.appendChild(prop)
-    prop = self.__create_xml_element(doc, 'fs.default.name', hdfsAddr, 
-                                   "description")
-    topElement.appendChild(prop)
-    mapredAddrSplit = mapredAddr.split(":")
-    mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
-    prop = self.__create_xml_element(doc, 'mapred.system.dir', mapredsystem, 
-                                   "description", True )
-    topElement.appendChild(prop)
-    prop = self.__create_xml_element(doc, 'hadoop.tmp.dir', confDir, 
-                                   "description")
-    topElement.appendChild(prop)
-    prop = self.__create_xml_element(doc, 'dfs.client.buffer.dir', 
-                                     confDir, "description")
-    topElement.appendChild(prop)
 
-    # clientParams aer enabled now
-    if clientParams:
-      for k, v in clientParams.iteritems():
-        prop = self.__create_xml_element(doc, k, v[0], "client param")
-        topElement.appendChild(prop)
+    description = {}
+    paramsDict = {  'mapred.job.tracker'    : mapredAddr , \
+                    'fs.default.name'       : hdfsAddr, \
+                    'hadoop.tmp.dir'        : confDir, \
+                    'dfs.client.buffer.dir' : tempDir, }
 
+    mapredAddrSplit = mapredAddr.split(":")
+    mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
+    paramsDict['mapred.system.dir'] = mapredsystem 
+    
+    # mapred-default.xml is no longer used now.
+    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
+    paramsDict['mapred.reduce.tasks'] = str(numred)
     # end
 
-    # servelParams
-    if serverParams:
-      for k, v in serverParams.iteritems():
-        prop = self.__create_xml_element(doc, k, v[0], "server param")
-        topElement.appendChild(prop)
+    # for all the above vars generated, set the description
+    for k, v in paramsDict.iteritems():
+      description[k] = 'Hod generated parameter'
 
     # finalservelParams
     if finalServerParams:
       for k, v in finalServerParams.iteritems():
-        prop = self.__create_xml_element(doc, k, v[0], "server param", True)
-        topElement.appendChild(prop)
+        if not description.has_key(k):
+          description[k] = "final server parameter"
+          paramsDict[k] = v
 
-   
-    # mapred-default.xml is no longer used now.
-    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
-    prop = self.__create_xml_element(doc, "mapred.reduce.tasks", str(numred), 
-                                 "description")
-    topElement.appendChild(prop)
-    # end
+    # servelParams
+    if serverParams:
+      for k, v in serverParams.iteritems():
+        if not description.has_key(k):
+          # if no final value for same param is mentioned
+          description[k] = "server parameter"
+          paramsDict[k] = v
+
+    # clientParams
+    if clientParams:
+      for k, v in clientParams.iteritems():
+        if not description.has_key(k) or description[k] == "server parameter":
+          # Just add, if no final value for same param is mentioned.
+          # Replace even if server param is mentioned for same config variable
+          description[k] = "client-side parameter"
+          paramsDict[k] = v
+    
+    # generate the xml elements
+    for k,v in paramsDict.iteritems():
+      if ( description[k] == "final server parameter" or \
+                             description[k] == "Hod generated parameter" ): 
+         final = True
+      else: final = False
+      prop = self.__create_xml_element(doc, k, v, description[k], final)
+      topElement.appendChild(prop)
 
     siteName = os.path.join(confDir, "hadoop-site.xml")
     sitefile = file(siteName, 'w')
@@ -174,44 +181,15 @@
     
     return serviceData
   
-  def __check_allocation_manager(self):
-    userValid = True
-    try:
-      self.serviceProxyClient = hodXRClient(
-        to_http_url(self.__cfg['hod']['proxy-xrs-address']), None, None, 0,
-        0, 1, False, 15)
-      
-      userValid = self.serviceProxyClient.isProjectUserValid(
-        self.__setup.cfg['hod']['userid'], 
-        self.__setup.cfg['resource_manager']['pbs-account'],True)
-      
-      if userValid:
-        self.__log.debug("Validated that user %s is part of project %s." %
-          (self.__cfg['hod']['userid'], 
-           self.__cfg['resource_manager']['pbs-account']))
-      else:
-        self.__log.debug("User %s is not part of project: %s." % (
-          self.__cfg['hod']['userid'], 
-          self.__cfg['resource_manager']['pbs-account']))
-        self.__log.error("Please specify a valid project in "
-                      + "resource_manager.pbs-account. If you still have "
-                      + "issues, please contact operations")
-        userValidd = False
-        # ignore invalid project for now - TODO
-    except Exception:
-      # ignore failures - non critical for now
-      self.__log.debug(
-        "Unable to contact Allocation Manager Proxy - ignoring...")
-      #userValid = False
-        
-    return userValid
-
   def __check_job_status(self):
     initWaitCount = 20
     count = 0
     status = False
     state = 'Q'
     while state == 'Q':
+      if hodInterrupt.isSet():
+        raise HodInterruptException()
+
       state = self.__nodePool.getJobState()
       if (state==False) or (state!='Q'):
         break
@@ -241,6 +219,9 @@
       waitTime = self.__cfg['hod']['allocate-wait-time']
   
       while count < waitTime:
+        if hodInterrupt.isSet():
+          raise HodInterruptException()
+
         ringList = self.__svcrgyClient.getServiceInfo(
           self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
           'ringmaster', 
@@ -267,8 +248,11 @@
     serviceAddress = None
     serviceInfo = None
  
-    for i in range(0, 250):
+    for i in range(0, 250): 
       try:
+        if hodInterrupt.isSet():
+            raise HodInterruptException()
+
         serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
         if serviceAddress:
           if serviceAddress == 'not found':
@@ -280,6 +264,8 @@
           else:
             serviceInfo = xmlrpcClient.getURLs(serviceName)           
             break 
+      except HodInterruptException,h :
+        raise h
       except:
         self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
         self.__log.debug(get_exception_string())
@@ -296,6 +282,8 @@
                                             self.jobId, self.__hostname, 
                                             serviceName, 'grid', serviceInfo)
         
+      except HodInterruptException, h:
+        raise h
       except:
         self.__log.critical("'%s': registry xmlrpc error." % serviceName)    
         self.__log.debug(get_exception_string())
@@ -326,6 +314,8 @@
          link):
 
          for i in range(1,5):
+           if hodInterrupt.isSet():
+             raise HodInterruptException()
            try:
              input = urllib.urlopen(link)
              break
@@ -385,6 +375,8 @@
                
              self.__log.debug("Finished grabbing: %s" % link)
            except AlarmException:
+             if hodInterrupt.isSet():
+               raise HodInterruptException()
              if out: out.close()
              if input: input.close()
              
@@ -403,31 +395,12 @@
     if 'mapred' in clusterInfo:
       mapredAddress = clusterInfo['mapred'][7:]
       hdfsAddress = clusterInfo['hdfs'][7:]
-  
-      mapredSocket = tcpSocket(mapredAddress)
-        
-      try:
-        mapredSocket.open()
-        mapredSocket.close()
-      except tcpError:
-        status = 14
-  
-      hdfsSocket = tcpSocket(hdfsAddress)
-        
-      try:
-        hdfsSocket.open()
-        hdfsSocket.close()
-      except tcpError:
-        if status > 0:
-          status = 10
-        else:
-          status = 13
-      
+      status = get_cluster_status(hdfsAddress, mapredAddress)
       if status == 0:
         status = 12
     else:
       status = 15
-      
+
     return status
   
   def cleanup(self):
@@ -455,37 +428,67 @@
       self.__log.critical("Minimum nodes must be greater than 2.")
       status = 2
     else:
-      if self.__check_allocation_manager():
-        nodeSet = self.__nodePool.newNodeSet(min)
-        self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet)
-        if self.jobId:                 
-          if self.__check_job_status():
+      nodeSet = self.__nodePool.newNodeSet(min)
+      walltime = None
+      if self.__cfg['hod'].has_key('walltime'):
+        walltime = self.__cfg['hod']['walltime']
+      self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
+      if self.jobId:
+        try:
+          jobStatus = self.__check_job_status()
+        except HodInterruptException, h:
+          self.__log.info(HOD_INTERRUPTED_MESG)
+          self.delete_job(self.jobId)
+          self.__log.info("Job %s qdelled." % self.jobId)
+          raise h
+
+        if jobStatus:
+          self.__log.info("Hod Job successfully submitted. JobId : %s." \
+                                                              % self.jobId)
+          try:
             self.ringmasterXRS = self.__get_ringmaster_client()
+            
+            self.__log.info("Ringmaster at : %s." % self.ringmasterXRS )
+            ringClient = None
             if self.ringmasterXRS:
               ringClient =  hodXRClient(self.ringmasterXRS)
-              
+                
               hdfsStatus, hdfsAddr, self.hdfsInfo = \
                 self.__init_hadoop_service('hdfs', ringClient)
-              
+                
               if hdfsStatus:
+                self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
+  
                 mapredStatus, mapredAddr, self.mapredInfo = \
                   self.__init_hadoop_service('mapred', ringClient)
-                  
+  
                 if mapredStatus:
-                  self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
                   self.__log.info("Mapred UI on http://%s" % self.mapredInfo)
- 
+  
+                  if self.__cfg['hod'].has_key('update-worker-info') \
+                    and self.__cfg['hod']['update-worker-info']:
+                    workerInfoMap = {}
+                    workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
+                    workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
+                    ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
+                    if ret != 0:
+                      self.__log.warn('Could not update HDFS and Mapred information.' \
+                                      'User Portal may not show relevant information.' \
+                                      'Error code=%s' % ret)
+  
+                  self.__cfg.replace_escape_seqs()
+                    
                   # Go generate the client side hadoop-site.xml now
                   # adding final-params as well, just so that conf on 
                   # client-side and server-side are (almost) the same
                   clientParams = None
                   serverParams = {}
                   finalServerParams = {}
-
+  
                   # client-params
                   if self.__cfg['hod'].has_key('client-params'):
                     clientParams = self.__cfg['hod']['client-params']
-
+  
                   # server-params
                   if self.__cfg['gridservice-mapred'].has_key('server-params'):
                     serverParams.update(\
@@ -494,8 +497,8 @@
                     # note that if there are params in both mapred and hdfs
                     # sections, the ones in hdfs overwirte the ones in mapred
                     serverParams.update(\
-                        self.__cfg['gridservice-mapred']['server-params'])
-                  
+                        self.__cfg['gridservice-hdfs']['server-params'])
+                    
                   # final-server-params
                   if self.__cfg['gridservice-mapred'].has_key(\
                                                     'final-server-params'):
@@ -505,9 +508,14 @@
                                                     'final-server-params'):
                     finalServerParams.update(\
                         self.__cfg['gridservice-hdfs']['final-server-params'])
-
+  
                   clusterFactor = self.__cfg['hod']['cluster-factor']
-                  self.__hadoopCfg.gen_site_conf(clusterDir, min,
+                  tempDir = self.__cfg['hod']['temp-dir']
+                  if not os.path.exists(tempDir):
+                    os.makedirs(tempDir)
+                  tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\
+                                  + "." + self.jobId )
+                  self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\
                             hdfsAddr, mapredAddr, clientParams,\
                             serverParams, finalServerParams,\
                             clusterFactor)
@@ -520,25 +528,52 @@
               status = 6
             if status != 0:
               self.__log.info("Cleaning up job id %s, as cluster could not be allocated." % self.jobId)
+              if ringClient is None:
+                self.delete_job(self.jobId)
+              else:
+                self.__log.debug("Calling rm.stop()")
+                ringClient.stopRM()
+                self.__log.debug("Returning from rm.stop()")
+          except HodInterruptException, h:
+            self.__log.info(HOD_INTERRUPTED_MESG)
+            if self.ringmasterXRS:
+              if ringClient is None:
+                ringClient =  hodXRClient(self.ringmasterXRS)
+              self.__log.debug("Calling rm.stop()")
+              ringClient.stopRM()
+              self.__log.debug("Returning from rm.stop()")
+              self.__log.info("Job Shutdown by informing ringmaster.")
+            else:
               self.delete_job(self.jobId)
-          else:
-            self.__log.critical("No job found, ringmaster failed to run.")
-            status = 5 
- 
-        elif self.jobId == False:
-          if exitCode == 188:
-            self.__log.critical("Request execeeded maximum resource allocation.")
-          else:
-            self.__log.critical("Insufficient resources available.")
-          status = 4
-        else:    
-          self.__log.critical("Scheduler failure, allocation failed.\n\n")        
-          status = 4
-      else:
-        status = 9
+              self.__log.info("Job %s qdelled directly." % self.jobId)
+            raise h
+        else:
+          self.__log.critical("No job found, ringmaster failed to run.")
+          status = 5 
+
+      elif self.jobId == False:
+        if exitCode == 188:
+          self.__log.critical("Request execeeded maximum resource allocation.")
+        else:
+          self.__log.critical("Insufficient resources available.")
+        status = 4
+      else:    
+        self.__log.critical("Scheduler failure, allocation failed.\n\n")        
+        status = 4
     
     return status
 
+  def __isRingMasterAlive(self, rmAddr):
+    ret = True
+    rmSocket = tcpSocket(rmAddr)
+    try:
+      rmSocket.open()
+      rmSocket.close()
+    except tcpError:
+      ret = False
+
+    return ret
+
   def deallocate(self, clusterDir, clusterInfo):
     status = 0 
     
@@ -546,6 +581,7 @@
                                          id=clusterInfo['jobid'])
     self.mapredInfo = clusterInfo['mapred']
     self.hdfsInfo = clusterInfo['hdfs']
+
     try:
       if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
         clusterStatus = self.check_cluster(clusterInfo)
@@ -554,9 +590,35 @@
           self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
       else:
         self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
+    except HodInterruptException, h:
+      # got an interrupt. just pass and proceed to qdel
+      pass 
     except:
       self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
-    status = self.__nodePool.finalize()
+    
+    rmAddr = None
+    if clusterInfo.has_key('ring'):
+      # format is http://host:port/ We need host:port
+      rmAddr = clusterInfo['ring'][7:]
+      if rmAddr.endswith('/'):
+        rmAddr = rmAddr[:-1]
+
+    if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
+      # Cluster is already dead, don't try to contact ringmaster.
+      self.__nodePool.finalize()
+      status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
+    else:
+      xrsAddr = clusterInfo['ring']
+      rmClient = hodXRClient(xrsAddr)
+      self.__log.debug('calling rm.stop')
+      rmClient.stopRM()
+      self.__log.debug('completed rm.stop')
+
+    # cleanup hod temp dirs
+    tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \
+                    self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
+    if os.path.exists(tempDir):
+      shutil.rmtree(tempDir)
    
     return status
   

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py Mon Jan 28 07:58:08 2008
@@ -15,7 +15,7 @@
 #limitations under the License.
 # -*- python -*-
 
-import sys, os, getpass, pprint, re, cPickle, random, shutil
+import sys, os, getpass, pprint, re, cPickle, random, shutil, time
 
 import hodlib.Common.logger
 
@@ -23,6 +23,9 @@
 from hodlib.Common.xmlrpc import hodXRClient
 from hodlib.Common.util import to_http_url, get_exception_string
 from hodlib.Common.util import get_exception_error_string
+from hodlib.Common.util import hodInterrupt, HodInterruptException
+from hodlib.Common.util import HOD_INTERRUPTED_CODE
+
 from hodlib.Common.nodepoolutil import NodePoolUtil
 from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
 
@@ -115,6 +118,9 @@
                                    level=self.__cfg['hod']['debug'], 
                                    addToLoggerNames=(self.__user ,))
 
+  def get_logger(self):
+    return self.__log
+
   def __setup_cluster_logger(self, directory):
     self.__baseLogger.add_file(logDirectory=directory, level=4, 
                                addToLoggerNames=(self.__user ,))
@@ -124,6 +130,8 @@
 
   def __norm_cluster_dir(self, directory):
     directory = os.path.expanduser(directory)
+    if not os.path.isabs(directory):
+      directory = os.path.join(self.__cfg['hod']['original-dir'], directory)
     directory = os.path.abspath(directory)
     
     return directory
@@ -202,7 +210,18 @@
             self.__opCode = self.__cluster.check_cluster(clusterInfo)
             if self.__opCode == 0 or self.__opCode == 15:
               self.__setup_service_registry()   
-              allocateStatus = self.__cluster.allocate(clusterDir, min, max)    
+              if hodInterrupt.isSet(): 
+                self.__cleanup()
+                raise HodInterruptException()
+              self.__log.info("Service Registry Started.")
+              try:
+                allocateStatus = self.__cluster.allocate(clusterDir, min, max)    
+              except HodInterruptException, h:
+                self.__cleanup()
+                raise h
+              # Allocation has gone through.
+              # Don't care about interrupts any more
+
               if allocateStatus == 0:
                 self.__set_cluster_state_info(os.environ, 
                                               self.__cluster.hdfsInfo, 
@@ -213,6 +232,8 @@
                 self.__setup_cluster_state(clusterDir)
                 self.__clusterState.write(self.__cluster.jobId, 
                                           self.__clusterStateInfo)
+                #  Do we need to check for interrupts here ??
+
                 self.__set_user_state_info( 
                   { clusterDir : self.__cluster.jobId, } )
               self.__opCode = allocateStatus
@@ -239,7 +260,15 @@
       self.__log.critical("%s operation requires two arguments. "  % operation
                         + "A cluster path and n nodes, or min-max nodes.")
       self.__opCode = 3
-  
+ 
+  def _is_cluster_allocated(self, clusterDir):
+    if os.path.isdir(clusterDir):
+      self.__setup_cluster_state(clusterDir)
+      clusterInfo = self.__clusterState.read()
+      if clusterInfo != {}:
+        return True
+    return False
+
   def _op_deallocate(self, args):
     operation = "deallocate"
     argLength = len(args)
@@ -293,25 +322,19 @@
         clusterStatus = self.__cluster.check_cluster(clusterInfo)
         if clusterStatus == 12:
           self.__log.info(clusterDir)
-          keys = clusterInfo.keys()
-          keys.sort()
-          for key in keys:
-            if key != 'env':
-              self.__log.info("%s\t%s" % (key, clusterInfo[key]))  
-            
-          if self.__cfg['hod']['debug'] == 4:
-            for var in clusterInfo['env'].keys():
-              self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+          self.__print_cluster_info(clusterInfo)
         elif clusterStatus == 10:
           self.__log.critical("%s cluster is dead" % clusterDir)
         elif clusterStatus == 13:
           self.__log.warn("%s cluster hdfs is dead" % clusterDir)
         elif clusterStatus == 14:
           self.__log.warn("%s cluster mapred is dead" % clusterDir)
-        
+
         if clusterStatus != 12:
           if clusterStatus == 15:
             self.__log.critical("Cluster %s not allocated." % clusterDir)
+          else:
+            self.__print_cluster_info(clusterInfo)
             
           self.__opCode = clusterStatus
       else:
@@ -321,7 +344,19 @@
       self.__log.critical("%s operation requires one argument. "  % operation
                         + "A cluster path.")
       self.__opCode = 3      
-  
+ 
+  def __print_cluster_info(self, clusterInfo):
+    keys = clusterInfo.keys()
+    keys.sort()
+    for key in keys:
+      if key != 'env':
+        self.__log.info("%s\t%s" % (key, clusterInfo[key]))  
+            
+    if self.__cfg['hod']['debug'] == 4:
+      for var in clusterInfo['env'].keys():
+        self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+
+ 
   def _op_help(self, args):  
     print "hod operations:\n"
     print " allocate <directory> <nodes> - Allocates a cluster of n nodes using the specified cluster"
@@ -342,6 +377,10 @@
       opList = self.__check_operation(operation)
       if self.__opCode == 0:
         getattr(self, "_op_%s" % opList[0])(opList)
+    except HodInterruptException, h:
+      self.__log.critical("op: %s failed because of an process interrupt." \
+                                                                % operation)
+      self.__opCode = HOD_INTERRUPTED_CODE
     except:
       self.__log.critical("op: %s failed: %s" % (operation,
                           get_exception_error_string()))
@@ -356,16 +395,41 @@
   def script(self):
     script = self.__cfg['hod']['script']
     nodes = self.__cfg['hod']['min-nodes']
+    isExecutable = os.access(script, os.X_OK)
+    if not isExecutable:
+      self.__log.critical('Script %s is not an executable.' % script)
+      return 1
+
     clusterDir = "/tmp/%s.%s" % (self.__cfg['hod']['userid'], 
                                  random.randint(0, 20000))
     os.mkdir(clusterDir)
+    ret = 0
     try:
       self._op_allocate(('allocate', clusterDir, str(nodes)))
-      scriptRunner = hadoopScript(clusterDir, 
+      if self.__opCode == 0:
+        if self.__cfg['hod'].has_key('script-wait-time'):
+          time.sleep(self.__cfg['hod']['script-wait-time'])
+          self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time'])
+        if hodInterrupt.isSet():
+          self.__log.debug('Interrupt set - not executing script')
+        else:
+          scriptRunner = hadoopScript(clusterDir, 
                                   self.__cfg['hod']['original-dir'])
-      self.__opCode = scriptRunner.run(script)
-      self._op_deallocate(('deallocate', clusterDir))
+          self.__opCode = scriptRunner.run(script)
+          ret = self.__opCode
+          self.__log.debug("Exit code from running the script: %d" % self.__opCode)
+      else:
+        self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode)
+
+      if hodInterrupt.isSet():
+        # Got interrupt while executing script. Unsetting it for deallocating
+        hodInterrupt.setFlag(False)
+      if self._is_cluster_allocated(clusterDir):
+        self._op_deallocate(('deallocate', clusterDir))
       shutil.rmtree(clusterDir, True)
+    except HodInterruptException, h:
+      self.__log.critical("Script failed because of an process interrupt.")
+      self.__opCode = HOD_INTERRUPTED_CODE
     except:
       self.__log.critical("script: %s failed: %s" % (script,
                           get_exception_error_string()))
@@ -373,4 +437,8 @@
     
     self.__cleanup()      
     
+    # We want to give importance to a failed script's exit code.
+    if ret != 0:
+      self.__opCode = ret
+
     return self.__opCode

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py?rev=615919&r1=615918&r2=615919&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py Mon Jan 28 07:58:08 2008
@@ -108,6 +108,10 @@
     """Delete a job, given it's id"""
     raise NotImplementedError
 
+  def updateWorkerInfo(self, workerInfoMap):
+    """Update information about the workers started by this NodePool."""
+    raise NotImplementedError
+
   def getNextNodeSetId(self):
     id = self.nextNodeSetId
     self.nextNodeSetId += 1