You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/20 06:25:37 UTC

svn commit: r776539 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobTracker.java src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java

Author: ddas
Date: Wed May 20 04:25:37 2009
New Revision: 776539

URL: http://svn.apache.org/viewvc?rev=776539&view=rev
Log:
HADOOP-5801. Fixes the problem: If the hosts file is changed across restart then it should be refreshed upon recovery so that the excluded hosts are lost and the maps are re-executed. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776539&r1=776538&r2=776539&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 20 04:25:37 2009
@@ -636,6 +636,10 @@
     HADOOP-5853. Undeprecate HttpServer.addInternalServlet method.  (Suresh
     Srinivas via szetszwo)
 
+    HADOOP-5801. Fixes the problem: If the hosts file is changed across restart
+    then it should be refreshed upon recovery so that the excluded hosts are 
+    lost and the maps are re-executed. (Amar Kamat via ddas)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=776539&r1=776538&r2=776539&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed May 20 04:25:37 2009
@@ -1848,6 +1848,9 @@
     } catch (Throwable t) {
       LOG.warn("Recovery manager crashed! Ignoring.", t);
     }
+    // refresh the node list as the recovery manager might have added 
+    // disallowed trackers
+    refreshHosts();
     
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
@@ -3610,7 +3613,12 @@
   public synchronized void refreshNodes() throws IOException {
     // check access
     PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
-
+    
+    // call the actual api
+    refreshHosts();
+  }
+  
+  private synchronized void refreshHosts() throws IOException {
     // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
     LOG.info("Refreshing hosts information");

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=776539&r1=776538&r2=776539&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Wed May 20 04:25:37 2009
@@ -30,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -375,4 +376,83 @@
     
     stopCluster();
   }
-}
\ No newline at end of file
+
+  /**
+   * Check if excluded hosts are decommissioned across restart  
+   */
+  public void testMRExcludeHostsAcrossRestarts() throws IOException {
+    // start a cluster with 2 hosts and empty exclude-hosts file
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+    File file = new File("hosts.exclude");
+    file.delete();
+    startCluster(2, 1, 0, conf);
+    String hostToDecommission = getHostname(1);
+    conf = mr.createJobConf(new JobConf(conf));
+
+    // submit a job
+    Path inDir = new Path("input");
+    Path outDir = new Path("output");
+    Path signalFilename = new Path("share");
+    JobConf newConf = new JobConf(conf);
+    UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1, 
+        "restart-decommission", signalFilename.toString(), 
+        signalFilename.toString());
+    
+    JobClient jobClient = new JobClient(newConf);
+    RunningJob job = jobClient.submitJob(newConf);
+    JobID id = job.getID();
+    
+    // wait for 50%
+    while (job.mapProgress() < 0.5f) {
+      UtilsForTests.waitFor(100);
+    }
+    
+    // change the exclude-hosts file to include one host
+    FileOutputStream out = new FileOutputStream(file);
+    LOG.info("Writing excluded nodes to log file " + file.toString());
+    BufferedWriter writer = null;
+    try {
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      writer.write( hostToDecommission + "\n"); // decommission first host
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      out.close();
+    }
+    file.deleteOnExit();
+
+    // restart the jobtracker
+    mr.stopJobTracker();
+    mr.startJobTracker();
+
+    // Wait for the JT to be ready
+    UtilsForTests.waitForJobTracker(jobClient);
+
+    jt = mr.getJobTrackerRunner().getJobTracker();
+    UtilsForTests.signalTasks(dfs, dfs.getFileSystem(), 
+        signalFilename.toString(), signalFilename.toString(), 1);
+
+    assertTrue("Decommissioning of tracker has no effect restarted job", 
+        jt.getJob(job.getID()).failedMapTasks > 0);
+    
+    // check the cluster status and tracker size
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    assertEquals("Excluded node count is incorrect", 
+                 1, jt.getClusterStatus(false).getNumExcludedNodes());
+    
+    // check if the host is disallowed
+    for (TaskTrackerStatus status : jt.taskTrackers()) {
+      assertFalse("Tracker from decommissioned host still exist", 
+                  status.getHost().equals(hostToDecommission));
+    }
+
+    // wait for the job
+    job.waitForCompletion();
+
+    stopCluster();
+  }
+}