You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/20 10:06:20 UTC

svn commit: r639214 - in /hadoop/core/trunk: CHANGES.txt src/contrib/hod/bin/hod src/contrib/hod/hodlib/Hod/hod.py src/contrib/hod/testing/lib.py

Author: ddas
Date: Thu Mar 20 02:06:19 2008
New Revision: 639214

URL: http://svn.apache.org/viewvc?rev=639214&view=rev
Log:
HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting the cluster directory. Contributed by Vinod Kumar Vavilapalli.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/hod/bin/hod
    hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
    hadoop/core/trunk/src/contrib/hod/testing/lib.py

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 20 02:06:19 2008
@@ -117,6 +117,9 @@
     HADOOP-2910. Throttle IPC Client/Server during bursts of 
     requests or server slowdown. (Hairong Kuang via dhruba)
 
+    HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting
+    the cluster directory. (Vinod Kumar Vavilapalli via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

Modified: hadoop/core/trunk/src/contrib/hod/bin/hod
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hod?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/hod (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/hod Thu Mar 20 02:06:19 2008
@@ -86,7 +86,7 @@
              ('clusterdir', 'directory', 
              'Directory where cluster state information and hadoop-site.xml' +
              ' will be stored.',
-              True, None, False, True, 'd'),
+              True, None, False, False, 'd'),
 
              ('syslog-address', 'address', 'Syslog address.',
               False, None, False, True, 'y'),

Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py Thu Mar 20 02:06:19 2008
@@ -87,7 +87,8 @@
           os.remove(item)
         
 class hodRunner:
-  def __init__(self, cfg):
+
+  def __init__(self, cfg, log=None, cluster=None):
     self.__hodhelp = hodHelp()
     self.__ops = self.__hodhelp.ops
     self.__cfg = cfg  
@@ -96,14 +97,22 @@
     self.__user = getpass.getuser()
     self.__registry = None
     self.__baseLogger = None
-    self.__setup_logger()
+    # Allowing to pass in log object to help testing - a stub can be passed in
+    if log is None:
+      self.__setup_logger()
+    else:
+      self.__log = log
     
     self.__userState = hodState(self.__cfg['hod']['user_state']) 
     
     self.__clusterState = None
     self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
     
-    self.__cluster = hadoopCluster(self.__cfg, self.__log)
+    # Allowing to pass in log object to help testing - a stib can be passed in
+    if cluster is None:
+      self.__cluster = hadoopCluster(self.__cfg, self.__log)
+    else:
+      self.__cluster = cluster
   
   def __setup_logger(self):
     self.__baseLogger = hodlib.Common.logger.hodLog('hod')
@@ -206,6 +215,12 @@
         self.__opCode = 3
         return
 
+      clusterList = self.__userState.read(CLUSTER_DATA_FILE)
+      if clusterDir in clusterList.keys():
+        self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
+        self.__opCode = 12
+        return
+ 
       self.__setup_cluster_logger(clusterDir)
       if re.match('\d+-\d+', nodes):
         (min, max) = nodes.split("-")
@@ -292,8 +307,7 @@
         self.__setup_cluster_state(clusterDir)
         clusterInfo = self.__clusterState.read()
         if clusterInfo == {}:
-          self.__opCode = 15
-          self.__log.critical("Cluster %s not allocated." % clusterDir)
+          self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
         else:
           self.__opCode = \
             self.__cluster.deallocate(clusterDir, clusterInfo)
@@ -302,9 +316,7 @@
           self.__clusterState.clear()
           self.__remove_cluster(clusterDir)
       else:
-        self.__log.critical("Invalid cluster directory '%s' specified." % 
-                            clusterDir)
-        self.__opCode = 3        
+        self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
     else:
       print self.__hodhelp.help_deallocate()
       self.__log.critical("%s operation requires one argument. "  % operation
@@ -314,8 +326,15 @@
   def _op_list(self, args):
     clusterList = self.__userState.read(CLUSTER_DATA_FILE)
     for path in clusterList.keys():
+      if not os.path.isdir(path):
+        self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
+        continue
       self.__setup_cluster_state(path)
       clusterInfo = self.__clusterState.read()
+      if clusterInfo == {}:
+        # something wrong with the cluster directory.
+        self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
+        continue
       clusterStatus = self.__cluster.check_cluster(clusterInfo)
       if clusterStatus == 12:
         self.__log.info("alive\t%s\t%s" % (clusterList[path], path))
@@ -334,34 +353,51 @@
       if os.path.isdir(clusterDir):
         self.__setup_cluster_state(clusterDir)
         clusterInfo = self.__clusterState.read()
-        clusterStatus = self.__cluster.check_cluster(clusterInfo)
-        if clusterStatus == 12:
-          self.__print_cluster_info(clusterInfo)
-          self.__log.info("hadoop-site.xml at %s" % clusterDir)
-        elif clusterStatus == 10:
-          self.__log.critical("%s cluster is dead" % clusterDir)
-        elif clusterStatus == 13:
-          self.__log.warn("%s cluster hdfs is dead" % clusterDir)
-        elif clusterStatus == 14:
-          self.__log.warn("%s cluster mapred is dead" % clusterDir)
-
-        if clusterStatus != 12:
-          if clusterStatus == 15:
-            self.__log.critical("Cluster %s not allocated." % clusterDir)
-          else:
+        if clusterInfo == {}:
+          # something wrong with the cluster directory.
+          self.__handle_invalid_cluster_directory(clusterDir)
+        else:
+          clusterStatus = self.__cluster.check_cluster(clusterInfo)
+          if clusterStatus == 12:
             self.__print_cluster_info(clusterInfo)
             self.__log.info("hadoop-site.xml at %s" % clusterDir)
+          elif clusterStatus == 10:
+            self.__log.critical("%s cluster is dead" % clusterDir)
+          elif clusterStatus == 13:
+            self.__log.warn("%s cluster hdfs is dead" % clusterDir)
+          elif clusterStatus == 14:
+            self.__log.warn("%s cluster mapred is dead" % clusterDir)
+
+          if clusterStatus != 12:
+            if clusterStatus == 15:
+              self.__log.critical("Cluster %s not allocated." % clusterDir)
+            else:
+              self.__print_cluster_info(clusterInfo)
+              self.__log.info("hadoop-site.xml at %s" % clusterDir)
             
-          self.__opCode = clusterStatus
+            self.__opCode = clusterStatus
       else:
-        self.__log.critical("'%s' does not exist." % clusterDir)
-        self.__opCode = 3 
+        self.__handle_invalid_cluster_directory(clusterDir)
     else:
       print self.__hodhelp.help_info()
       self.__log.critical("%s operation requires one argument. "  % operation
                         + "A cluster path.")
       self.__opCode = 3      
- 
+
+  def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
+    clusterList = self.__userState.read(CLUSTER_DATA_FILE)
+    if clusterDir in clusterList.keys():
+      # previously allocated cluster.
+      self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
+      if cleanUp:
+        self.__cluster.delete_job(clusterList[clusterDir])
+        self.__remove_cluster(clusterDir)
+        self.__log.critical("Freeing resources allocated to the cluster.")
+      self.__opCode = 3
+    else:
+      self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir))
+      self.__opCode = 15
+    
   def __print_cluster_info(self, clusterInfo):
     keys = clusterInfo.keys()
 

Modified: hadoop/core/trunk/src/contrib/hod/testing/lib.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/testing/lib.py?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/testing/lib.py (original)
+++ hadoop/core/trunk/src/contrib/hod/testing/lib.py Thu Mar 20 02:06:19 2008
@@ -62,3 +62,49 @@
   for i in range(0,79):
     str = str + "*"
   print >>sys.stderr, "\n", str, "\n"
+
+# This class captures all log messages logged by hodRunner and other classes.
+# It is then used to verify that certain log messages have come. This is one
+# way to validate that messages printed to the logger are correctly written.
+class MockLogger:
+  def __init__(self):
+    self.__logLines = {}
+
+  def info(self, message):
+    self.__logLines[message] = 'info'
+
+  def critical(self, message):
+    self.__logLines[message] = 'critical'
+
+  def warn(self, message):
+    self.__logLines[message] = 'warn'
+
+  def debug(self, message):
+    # don't track debug lines.
+    pass
+
+  # verify a certain message has been logged at the defined level of severity.
+  def hasMessage(self, message, level):
+    if not self.__logLines.has_key(message):
+      return False
+    return self.__logLines[message] == level
+
+# Stub class to test cluster manipulation operations.
+class MockHadoopCluster:
+  
+  def __init__(self):
+    # store the operations received.
+    self.__operations = {}
+  
+  def delete_job(self, jobid):
+    self.__operations['delete_job'] = [jobid]
+  
+  def wasOperationPerformed(self, operation, args):
+    if self.__operations.has_key(operation):
+      actualArgs = self.__operations[operation]
+      for arg in actualArgs:
+        if arg not in args:
+          break
+      else:
+        return True
+    return False