You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/20 06:25:37 UTC
svn commit: r776539 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobTracker.java
src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
Author: ddas
Date: Wed May 20 04:25:37 2009
New Revision: 776539
URL: http://svn.apache.org/viewvc?rev=776539&view=rev
Log:
HADOOP-5801. Fixes the problem: If the hosts file is changed across restart then it should be refreshed upon recovery so that the excluded hosts are lost and the maps are re-executed. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776539&r1=776538&r2=776539&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 20 04:25:37 2009
@@ -636,6 +636,10 @@
HADOOP-5853. Undeprecate HttpServer.addInternalServlet method. (Suresh
Srinivas via szetszwo)
+ HADOOP-5801. Fixes the problem: If the hosts file is changed across restart
+ then it should be refreshed upon recovery so that the excluded hosts are
+ lost and the maps are re-executed. (Amar Kamat via ddas)
+
Release 0.20.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=776539&r1=776538&r2=776539&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed May 20 04:25:37 2009
@@ -1848,6 +1848,9 @@
} catch (Throwable t) {
LOG.warn("Recovery manager crashed! Ignoring.", t);
}
+ // refresh the node list as the recovery manager might have added
+ // disallowed trackers
+ refreshHosts();
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
@@ -3610,7 +3613,12 @@
public synchronized void refreshNodes() throws IOException {
// check access
PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
-
+
+ // call the actual api
+ refreshHosts();
+ }
+
+ private synchronized void refreshHosts() throws IOException {
// Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list
LOG.info("Refreshing hosts information");
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=776539&r1=776538&r2=776539&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Wed May 20 04:25:37 2009
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -375,4 +376,83 @@
stopCluster();
}
-}
\ No newline at end of file
+
+ /**
+ * Check if excluded hosts are decommissioned across restart
+ */
+ public void testMRExcludeHostsAcrossRestarts() throws IOException {
+ // start a cluster with 2 hosts and empty exclude-hosts file
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+ File file = new File("hosts.exclude");
+ file.delete();
+ startCluster(2, 1, 0, conf);
+ String hostToDecommission = getHostname(1);
+ conf = mr.createJobConf(new JobConf(conf));
+
+ // submit a job
+ Path inDir = new Path("input");
+ Path outDir = new Path("output");
+ Path signalFilename = new Path("share");
+ JobConf newConf = new JobConf(conf);
+ UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,
+ "restart-decommission", signalFilename.toString(),
+ signalFilename.toString());
+
+ JobClient jobClient = new JobClient(newConf);
+ RunningJob job = jobClient.submitJob(newConf);
+ JobID id = job.getID();
+
+ // wait for 50%
+ while (job.mapProgress() < 0.5f) {
+ UtilsForTests.waitFor(100);
+ }
+
+ // change the exclude-hosts file to include one host
+ FileOutputStream out = new FileOutputStream(file);
+ LOG.info("Writing excluded nodes to log file " + file.toString());
+ BufferedWriter writer = null;
+ try {
+ writer = new BufferedWriter(new OutputStreamWriter(out));
+ writer.write( hostToDecommission + "\n"); // decommission first host
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ out.close();
+ }
+ file.deleteOnExit();
+
+ // restart the jobtracker
+ mr.stopJobTracker();
+ mr.startJobTracker();
+
+ // Wait for the JT to be ready
+ UtilsForTests.waitForJobTracker(jobClient);
+
+ jt = mr.getJobTrackerRunner().getJobTracker();
+ UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),
+ signalFilename.toString(), signalFilename.toString(), 1);
+
+ assertTrue("Decommissioning of tracker has no effect restarted job",
+ jt.getJob(job.getID()).failedMapTasks > 0);
+
+ // check the cluster status and tracker size
+ assertEquals("Tracker is not lost upon host decommissioning",
+ 1, jt.getClusterStatus(false).getTaskTrackers());
+ assertEquals("Excluded node count is incorrect",
+ 1, jt.getClusterStatus(false).getNumExcludedNodes());
+
+ // check if the host is disallowed
+ for (TaskTrackerStatus status : jt.taskTrackers()) {
+ assertFalse("Tracker from decommissioned host still exist",
+ status.getHost().equals(hostToDecommission));
+ }
+
+ // wait for the job
+ job.waitForCompletion();
+
+ stopCluster();
+ }
+}