You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/20 10:06:20 UTC
svn commit: r639214 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/hod/bin/hod src/contrib/hod/hodlib/Hod/hod.py
src/contrib/hod/testing/lib.py
Author: ddas
Date: Thu Mar 20 02:06:19 2008
New Revision: 639214
URL: http://svn.apache.org/viewvc?rev=639214&view=rev
Log:
HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting the cluster directory. Contributed by Vinod Kumar Vavilapalli.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/hod/bin/hod
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
hadoop/core/trunk/src/contrib/hod/testing/lib.py
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 20 02:06:19 2008
@@ -117,6 +117,9 @@
HADOOP-2910. Throttle IPC Client/Server during bursts of
requests or server slowdown. (Hairong Kuang via dhruba)
+ HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting
+ the cluster directory. (Vinod Kumar Vavilapalli via ddas)
+
OPTIMIZATIONS
HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
Modified: hadoop/core/trunk/src/contrib/hod/bin/hod
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hod?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/bin/hod (original)
+++ hadoop/core/trunk/src/contrib/hod/bin/hod Thu Mar 20 02:06:19 2008
@@ -86,7 +86,7 @@
('clusterdir', 'directory',
'Directory where cluster state information and hadoop-site.xml' +
' will be stored.',
- True, None, False, True, 'd'),
+ True, None, False, False, 'd'),
('syslog-address', 'address', 'Syslog address.',
False, None, False, True, 'y'),
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py Thu Mar 20 02:06:19 2008
@@ -87,7 +87,8 @@
os.remove(item)
class hodRunner:
- def __init__(self, cfg):
+
+ def __init__(self, cfg, log=None, cluster=None):
self.__hodhelp = hodHelp()
self.__ops = self.__hodhelp.ops
self.__cfg = cfg
@@ -96,14 +97,22 @@
self.__user = getpass.getuser()
self.__registry = None
self.__baseLogger = None
- self.__setup_logger()
+ # Allowing to pass in log object to help testing - a stub can be passed in
+ if log is None:
+ self.__setup_logger()
+ else:
+ self.__log = log
self.__userState = hodState(self.__cfg['hod']['user_state'])
self.__clusterState = None
self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
- self.__cluster = hadoopCluster(self.__cfg, self.__log)
+ # Allowing to pass in log object to help testing - a stib can be passed in
+ if cluster is None:
+ self.__cluster = hadoopCluster(self.__cfg, self.__log)
+ else:
+ self.__cluster = cluster
def __setup_logger(self):
self.__baseLogger = hodlib.Common.logger.hodLog('hod')
@@ -206,6 +215,12 @@
self.__opCode = 3
return
+ clusterList = self.__userState.read(CLUSTER_DATA_FILE)
+ if clusterDir in clusterList.keys():
+ self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
+ self.__opCode = 12
+ return
+
self.__setup_cluster_logger(clusterDir)
if re.match('\d+-\d+', nodes):
(min, max) = nodes.split("-")
@@ -292,8 +307,7 @@
self.__setup_cluster_state(clusterDir)
clusterInfo = self.__clusterState.read()
if clusterInfo == {}:
- self.__opCode = 15
- self.__log.critical("Cluster %s not allocated." % clusterDir)
+ self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
else:
self.__opCode = \
self.__cluster.deallocate(clusterDir, clusterInfo)
@@ -302,9 +316,7 @@
self.__clusterState.clear()
self.__remove_cluster(clusterDir)
else:
- self.__log.critical("Invalid cluster directory '%s' specified." %
- clusterDir)
- self.__opCode = 3
+ self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
else:
print self.__hodhelp.help_deallocate()
self.__log.critical("%s operation requires one argument. " % operation
@@ -314,8 +326,15 @@
def _op_list(self, args):
clusterList = self.__userState.read(CLUSTER_DATA_FILE)
for path in clusterList.keys():
+ if not os.path.isdir(path):
+ self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
+ continue
self.__setup_cluster_state(path)
clusterInfo = self.__clusterState.read()
+ if clusterInfo == {}:
+ # something wrong with the cluster directory.
+ self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
+ continue
clusterStatus = self.__cluster.check_cluster(clusterInfo)
if clusterStatus == 12:
self.__log.info("alive\t%s\t%s" % (clusterList[path], path))
@@ -334,34 +353,51 @@
if os.path.isdir(clusterDir):
self.__setup_cluster_state(clusterDir)
clusterInfo = self.__clusterState.read()
- clusterStatus = self.__cluster.check_cluster(clusterInfo)
- if clusterStatus == 12:
- self.__print_cluster_info(clusterInfo)
- self.__log.info("hadoop-site.xml at %s" % clusterDir)
- elif clusterStatus == 10:
- self.__log.critical("%s cluster is dead" % clusterDir)
- elif clusterStatus == 13:
- self.__log.warn("%s cluster hdfs is dead" % clusterDir)
- elif clusterStatus == 14:
- self.__log.warn("%s cluster mapred is dead" % clusterDir)
-
- if clusterStatus != 12:
- if clusterStatus == 15:
- self.__log.critical("Cluster %s not allocated." % clusterDir)
- else:
+ if clusterInfo == {}:
+ # something wrong with the cluster directory.
+ self.__handle_invalid_cluster_directory(clusterDir)
+ else:
+ clusterStatus = self.__cluster.check_cluster(clusterInfo)
+ if clusterStatus == 12:
self.__print_cluster_info(clusterInfo)
self.__log.info("hadoop-site.xml at %s" % clusterDir)
+ elif clusterStatus == 10:
+ self.__log.critical("%s cluster is dead" % clusterDir)
+ elif clusterStatus == 13:
+ self.__log.warn("%s cluster hdfs is dead" % clusterDir)
+ elif clusterStatus == 14:
+ self.__log.warn("%s cluster mapred is dead" % clusterDir)
+
+ if clusterStatus != 12:
+ if clusterStatus == 15:
+ self.__log.critical("Cluster %s not allocated." % clusterDir)
+ else:
+ self.__print_cluster_info(clusterInfo)
+ self.__log.info("hadoop-site.xml at %s" % clusterDir)
- self.__opCode = clusterStatus
+ self.__opCode = clusterStatus
else:
- self.__log.critical("'%s' does not exist." % clusterDir)
- self.__opCode = 3
+ self.__handle_invalid_cluster_directory(clusterDir)
else:
print self.__hodhelp.help_info()
self.__log.critical("%s operation requires one argument. " % operation
+ "A cluster path.")
self.__opCode = 3
-
+
+ def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
+ clusterList = self.__userState.read(CLUSTER_DATA_FILE)
+ if clusterDir in clusterList.keys():
+ # previously allocated cluster.
+ self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
+ if cleanUp:
+ self.__cluster.delete_job(clusterList[clusterDir])
+ self.__remove_cluster(clusterDir)
+ self.__log.critical("Freeing resources allocated to the cluster.")
+ self.__opCode = 3
+ else:
+ self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir))
+ self.__opCode = 15
+
def __print_cluster_info(self, clusterInfo):
keys = clusterInfo.keys()
Modified: hadoop/core/trunk/src/contrib/hod/testing/lib.py
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/testing/lib.py?rev=639214&r1=639213&r2=639214&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/testing/lib.py (original)
+++ hadoop/core/trunk/src/contrib/hod/testing/lib.py Thu Mar 20 02:06:19 2008
@@ -62,3 +62,49 @@
for i in range(0,79):
str = str + "*"
print >>sys.stderr, "\n", str, "\n"
+
+# This class captures all log messages logged by hodRunner and other classes.
+# It is then used to verify that certain log messages have come. This is one
+# way to validate that messages printed to the logger are correctly written.
+class MockLogger:
+ def __init__(self):
+ self.__logLines = {}
+
+ def info(self, message):
+ self.__logLines[message] = 'info'
+
+ def critical(self, message):
+ self.__logLines[message] = 'critical'
+
+ def warn(self, message):
+ self.__logLines[message] = 'warn'
+
+ def debug(self, message):
+ # don't track debug lines.
+ pass
+
+ # verify a certain message has been logged at the defined level of severity.
+ def hasMessage(self, message, level):
+ if not self.__logLines.has_key(message):
+ return False
+ return self.__logLines[message] == level
+
+# Stub class to test cluster manipulation operations.
+class MockHadoopCluster:
+
+ def __init__(self):
+ # store the operations received.
+ self.__operations = {}
+
+ def delete_job(self, jobid):
+ self.__operations['delete_job'] = [jobid]
+
+ def wasOperationPerformed(self, operation, args):
+ if self.__operations.has_key(operation):
+ actualArgs = self.__operations[operation]
+ for arg in actualArgs:
+ if arg not in args:
+ break
+ else:
+ return True
+ return False