You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:37:16 UTC

svn commit: r1077068 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/JobTracker.java test/org/apache/hadoop/mapred/TestNodeBlacklisting.java

Author: omalley
Date: Fri Mar  4 03:37:16 2011
New Revision: 1077068

URL: http://svn.apache.org/viewvc?rev=1077068&view=rev
Log:
commit a70d5bbed1c6dd2868a22b38fad4ca8c7a0a4411
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date:   Wed Dec 9 10:23:53 2009 +0530

    MAPREDUCE:754 from https://issues.apache.org/jira/secure/attachment/12427347/mapreduce-754-v2.2.1-yahoo.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. (Amar Kamat
    +    via sharad)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077068&r1=1077067&r2=1077068&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:37:16 2011
@@ -401,16 +401,7 @@ public class JobTracker implements MRCon
                   // tracker has already been destroyed.
                   if (newProfile != null) {
                     if ((now - newProfile.getLastSeen()) > TASKTRACKER_EXPIRY_INTERVAL) {
-                      // Remove completely after marking the tasks as 'KILLED'
-                      lostTaskTracker(current);
-                      // tracker is lost, and if it is blacklisted, remove 
-                      // it from the count of blacklisted trackers in the cluster
-                      if (isBlacklisted(trackerName)) {
-                    	  faultyTrackers.decrBlackListedTrackers(1);
-                      }
-                      updateTaskTrackerStatus(trackerName, null);
-                      statistics.taskTrackerRemoved(trackerName);
-                      getInstrumentation().decTrackers(1);
+                      removeTracker(current);
                       // remove the mapping from the hosts list
                       String hostname = newProfile.getHost();
                       hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -877,17 +868,20 @@ public class JobTracker implements MRCon
     private void removeHostCapacity(String hostName) {
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
+        int numTrackersOnHost = 0;
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
           int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity -= mapSlots;
           int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity -= reduceSlots;
+          ++numTrackersOnHost;
           getInstrumentation().addBlackListedMapSlots(
               mapSlots);
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
+        uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(numTrackersOnHost);
       }
     }
     
@@ -3095,12 +3089,14 @@ public class JobTracker implements MRCon
         taskTrackers.remove(trackerName);
         Integer numTaskTrackersInHost = 
           uniqueHostsMap.get(oldStatus.getHost());
-        numTaskTrackersInHost --;
-        if (numTaskTrackersInHost > 0)  {
-          uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
-        }
-        else {
-          uniqueHostsMap.remove(oldStatus.getHost());
+        if (numTaskTrackersInHost != null) {
+          numTaskTrackersInHost --;
+          if (numTaskTrackersInHost > 0)  {
+            uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
+          }
+          else {
+            uniqueHostsMap.remove(oldStatus.getHost());
+          }
         }
       }
     }
@@ -4291,6 +4287,21 @@ public class JobTracker implements MRCon
     decommissionNodes(excludeSet);
   }
 
+  // Remove a tracker from the system
+  private void removeTracker(TaskTracker tracker) {
+    String trackerName = tracker.getTrackerName();
+    // Remove completely after marking the tasks as 'KILLED'
+    lostTaskTracker(tracker);
+    // tracker is lost, and if it is blacklisted, remove 
+    // it from the count of blacklisted trackers in the cluster
+    if (isBlacklisted(trackerName)) {
+     faultyTrackers.decrBlackListedTrackers(1);
+    }
+    updateTaskTrackerStatus(trackerName, null);
+    statistics.taskTrackerRemoved(trackerName);
+    getInstrumentation().decTrackers(1);
+  }
+  
   // main decommission
   synchronized void decommissionNodes(Set<String> hosts) 
   throws IOException {  
@@ -4304,11 +4315,9 @@ public class JobTracker implements MRCon
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
           if (trackers != null) {
             for (TaskTracker tracker : trackers) {
-              LOG.info("Decommission: Losing tracker " + tracker + 
+              LOG.info("Decommission: Losing tracker " + tracker.getTrackerName() + 
                        " on host " + host);
-              lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(
-                tracker.getStatus().getTrackerName(), null);
+              removeTracker(tracker);
             }
             trackersDecommissioned += trackers.size();
           }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java?rev=1077068&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java Fri Mar  4 03:37:16 2011
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * Test node blacklisting. This testcase tests
+ *   - node blacklisting along with node refresh 
+ */
+public class TestNodeBlacklisting extends TestCase {
+  public static final Log LOG = LogFactory.getLog(TestNodeBlacklisting.class);
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data", "/tmp"), "node-bklisting");
+
+  // Mapper that fails once for the first time
+  static class FailOnceMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    private boolean shouldFail = false;
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      if (shouldFail) {
+        throw new RuntimeException("failing map");
+      }
+    }
+     
+    @Override
+    public void configure(JobConf conf) {
+      TaskAttemptID id = TaskAttemptID.forName(conf.get("mapred.task.id"));
+      shouldFail = id.getId() == 0 && id.getTaskID().getId() == 0; 
+    }
+  }
+   
+  /**
+   * Check refreshNodes for decommissioning blacklisted nodes. 
+   */
+  public void testBlacklistedNodeDecommissioning() throws Exception {
+    LOG.info("Testing blacklisted node decommissioning");
+    MiniMRCluster mr = null;
+    JobTracker jt = null;
+     
+    try {
+      // start mini mr
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.max.tracker.blacklists", "1");
+      mr = new MiniMRCluster(0, 0, 2, "file:///", 1, null, null, null, jtConf);
+      jt = mr.getJobTrackerRunner().getJobTracker();
+
+      assertEquals("Trackers not up", 2, jt.taskTrackers().size());
+      // validate the total tracker count
+      assertEquals("Active tracker count mismatch", 
+                   2, jt.getClusterStatus(false).getTaskTrackers());
+      // validate blacklisted count
+      assertEquals("Blacklisted tracker count mismatch", 
+                   0, jt.getClusterStatus(false).getBlacklistedTrackers());
+      
+      // run a failing job to blacklist the tracker
+      JobConf jConf = mr.createJobConf();
+      jConf.set("mapred.max.tracker.failures", "1");
+      jConf.setJobName("test-job-fail-once");
+      jConf.setMapperClass(FailOnceMapper.class);
+      jConf.setReducerClass(IdentityReducer.class);
+      jConf.setNumMapTasks(1);
+      jConf.setNumReduceTasks(0);
+
+      RunningJob job = 
+        UtilsForTests.runJob(jConf, new Path(TEST_DIR, "in"), 
+                             new Path(TEST_DIR, "out"));
+      job.waitForCompletion();
+
+      // validate the total tracker count
+      assertEquals("Active tracker count mismatch", 
+                   1, jt.getClusterStatus(false).getTaskTrackers());
+      // validate blacklisted count
+      assertEquals("Blacklisted tracker count mismatch", 
+                   1, jt.getClusterStatus(false).getBlacklistedTrackers());
+
+      // find the blacklisted tracker
+      String trackerName = null;
+      for (TaskTrackerStatus status : jt.taskTrackers()) {
+        if (jt.isBlacklisted(status.getTrackerName())) {
+          trackerName = status.getTrackerName();
+          break;
+        }
+      }
+      // get the hostname
+      String hostToDecommission = 
+        JobInProgress.convertTrackerNameToHostName(trackerName);
+      LOG.info("Decommissioning tracker " + hostToDecommission);
+
+      // decommission the node
+      HashSet<String> decom = new HashSet<String>(1);
+      decom.add(hostToDecommission);
+      jt.decommissionNodes(decom);
+
+      // validate
+      // check the cluster status and tracker size
+      assertEquals("Tracker is not lost upon host decommissioning", 
+                   1, jt.getClusterStatus(false).getTaskTrackers());
+      assertEquals("Blacklisted tracker count incorrect in cluster status "
+                   + "after decommissioning", 
+                   0, jt.getClusterStatus(false).getBlacklistedTrackers());
+      assertEquals("Tracker is not lost upon host decommissioning", 
+                   1, jt.taskTrackers().size());
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+        mr = null;
+        jt = null;
+        FileUtil.fullyDelete(new File(TEST_DIR.toString()));
+      }
+    }
+  }
+}