You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:24:17 UTC
svn commit: r1076940 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/JobTracker.java
test/org/apache/hadoop/mapred/TestNodeRefresh.java
Author: omalley
Date: Fri Mar 4 03:24:17 2011
New Revision: 1076940
URL: http://svn.apache.org/viewvc?rev=1076940&view=rev
Log:
commit 29a5461216d824ec053c24e9d4e55d8ce9a3d48c
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:24 2009 -0700
Applying patch 2747066.5801.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076940&r1=1076939&r2=1076940&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:24:17 2011
@@ -1934,6 +1934,9 @@ public class JobTracker implements MRCon
} catch (Throwable t) {
LOG.warn("Recovery manager crashed! Ignoring.", t);
}
+ // refresh the node list as the recovery manager might have added
+ // disallowed trackers
+ refreshHosts();
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
@@ -3875,7 +3878,12 @@ public class JobTracker implements MRCon
public synchronized void refreshNodes() throws IOException {
// check access
PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
-
+
+ // call the actual api
+ refreshHosts();
+ }
+
+ private synchronized void refreshHosts() throws IOException {
// Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list
LOG.info("Refreshing hosts information");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=1076940&r1=1076939&r2=1076940&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java Fri Mar 4 03:24:17 2011
@@ -30,6 +30,7 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -375,4 +376,82 @@ public class TestNodeRefresh extends Tes
stopCluster();
}
+
+ /**
+ * Check if excluded hosts are decommissioned across restart
+ */
+ public void testMRExcludeHostsAcrossRestarts() throws IOException {
+ // start a cluster with 2 hosts and empty exclude-hosts file
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+ File file = new File("hosts.exclude");
+ file.delete();
+ startCluster(2, 1, 0, conf);
+ String hostToDecommission = getHostname(1);
+ conf = mr.createJobConf(new JobConf(conf));
+
+ // submit a job
+ Path inDir = new Path("input");
+ Path outDir = new Path("output");
+ Path signalFilename = new Path("share");
+ JobConf newConf = new JobConf(conf);
+ UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,
+ "restart-decommission", signalFilename.toString(),
+ signalFilename.toString());
+
+ JobClient jobClient = new JobClient(newConf);
+ RunningJob job = jobClient.submitJob(newConf);
+ JobID id = job.getID();
+
+ // wait for 50%
+ while (job.mapProgress() < 0.5f) {
+ UtilsForTests.waitFor(100);
+ }
+
+ // change the exclude-hosts file to include one host
+ FileOutputStream out = new FileOutputStream(file);
+ LOG.info("Writing excluded nodes to log file " + file.toString());
+ BufferedWriter writer = null;
+ try {
+ writer = new BufferedWriter(new OutputStreamWriter(out));
+ writer.write( hostToDecommission + "\n"); // decommission first host
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ out.close();
+ }
+ file.deleteOnExit();
+
+ // restart the jobtracker
+ mr.stopJobTracker();
+ mr.startJobTracker();
+ // Wait for the JT to be ready
+ UtilsForTests.waitForJobTracker(jobClient);
+
+ jt = mr.getJobTrackerRunner().getJobTracker();
+ UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),
+ signalFilename.toString(), signalFilename.toString(), 1);
+
+ assertTrue("Decommissioning of tracker has no effect restarted job",
+ jt.getJob(job.getID()).failedMapTasks > 0);
+
+ // check the cluster status and tracker size
+ assertEquals("Tracker is not lost upon host decommissioning",
+ 1, jt.getClusterStatus(false).getTaskTrackers());
+ assertEquals("Excluded node count is incorrect",
+ 1, jt.getClusterStatus(false).getNumExcludedNodes());
+
+ // check if the host is disallowed
+ for (TaskTrackerStatus status : jt.taskTrackers()) {
+ assertFalse("Tracker from decommissioned host still exist",
+ status.getHost().equals(hostToDecommission));
+ }
+
+ // wait for the job
+ job.waitForCompletion();
+
+ stopCluster();
+ }
}