You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:24:17 UTC

svn commit: r1076940 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/JobTracker.java test/org/apache/hadoop/mapred/TestNodeRefresh.java

Author: omalley
Date: Fri Mar  4 03:24:17 2011
New Revision: 1076940

URL: http://svn.apache.org/viewvc?rev=1076940&view=rev
Log:
commit 29a5461216d824ec053c24e9d4e55d8ce9a3d48c
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:24 2009 -0700

    Applying patch 2747066.5801.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076940&r1=1076939&r2=1076940&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:24:17 2011
@@ -1934,6 +1934,9 @@ public class JobTracker implements MRCon
     } catch (Throwable t) {
       LOG.warn("Recovery manager crashed! Ignoring.", t);
     }
+    // refresh the node list as the recovery manager might have added 
+    // disallowed trackers
+    refreshHosts();
     
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
@@ -3875,7 +3878,12 @@ public class JobTracker implements MRCon
   public synchronized void refreshNodes() throws IOException {
     // check access
     PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
-
+    
+    // call the actual api
+    refreshHosts();
+  }
+  
+  private synchronized void refreshHosts() throws IOException {
     // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
     LOG.info("Refreshing hosts information");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=1076940&r1=1076939&r2=1076940&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java Fri Mar  4 03:24:17 2011
@@ -30,6 +30,7 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -375,4 +376,82 @@ public class TestNodeRefresh extends Tes
     
     stopCluster();
   }
+
+  /** 
+    * Check if excluded hosts are decommissioned across restart   
+    */ 
+   public void testMRExcludeHostsAcrossRestarts() throws IOException { 
+     // start a cluster with 2 hosts and empty exclude-hosts file 
+     Configuration conf = new Configuration(); 
+     conf.setBoolean("mapred.jobtracker.restart.recover", true); 
+  
+     File file = new File("hosts.exclude"); 
+     file.delete(); 
+     startCluster(2, 1, 0, conf); 
+     String hostToDecommission = getHostname(1); 
+     conf = mr.createJobConf(new JobConf(conf)); 
+  
+     // submit a job 
+     Path inDir = new Path("input"); 
+     Path outDir = new Path("output"); 
+     Path signalFilename = new Path("share"); 
+     JobConf newConf = new JobConf(conf); 
+     UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,  
+         "restart-decommission", signalFilename.toString(),  
+         signalFilename.toString()); 
+      
+     JobClient jobClient = new JobClient(newConf); 
+     RunningJob job = jobClient.submitJob(newConf); 
+     JobID id = job.getID(); 
+      
+     // wait for 50% 
+     while (job.mapProgress() < 0.5f) { 
+       UtilsForTests.waitFor(100); 
+     } 
+      
+     // change the exclude-hosts file to include one host 
+     FileOutputStream out = new FileOutputStream(file); 
+     LOG.info("Writing excluded nodes to log file " + file.toString()); 
+     BufferedWriter writer = null; 
+     try { 
+       writer = new BufferedWriter(new OutputStreamWriter(out)); 
+       writer.write( hostToDecommission + "\n"); // decommission first host 
+     } finally { 
+       if (writer != null) { 
+         writer.close(); 
+       } 
+       out.close(); 
+     } 
+     file.deleteOnExit(); 
+  
+     // restart the jobtracker 
+     mr.stopJobTracker(); 
+     mr.startJobTracker(); 
+     // Wait for the JT to be ready 
+     UtilsForTests.waitForJobTracker(jobClient); 
+  
+     jt = mr.getJobTrackerRunner().getJobTracker(); 
+     UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),  
+         signalFilename.toString(), signalFilename.toString(), 1); 
+  
+     assertTrue("Decommissioning of tracker has no effect restarted job",  
+         jt.getJob(job.getID()).failedMapTasks > 0); 
+      
+     // check the cluster status and tracker size 
+     assertEquals("Tracker is not lost upon host decommissioning",  
+                  1, jt.getClusterStatus(false).getTaskTrackers()); 
+     assertEquals("Excluded node count is incorrect",  
+                  1, jt.getClusterStatus(false).getNumExcludedNodes()); 
+      
+     // check if the host is disallowed 
+     for (TaskTrackerStatus status : jt.taskTrackers()) { 
+       assertFalse("Tracker from decommissioned host still exist",  
+                   status.getHost().equals(hostToDecommission)); 
+     } 
+  
+     // wait for the job 
+     job.waitForCompletion(); 
+  
+     stopCluster(); 
+   } 
 }