You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:37:16 UTC
svn commit: r1077068 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/JobTracker.java
test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
Author: omalley
Date: Fri Mar 4 03:37:16 2011
New Revision: 1077068
URL: http://svn.apache.org/viewvc?rev=1077068&view=rev
Log:
commit a70d5bbed1c6dd2868a22b38fad4ca8c7a0a4411
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Wed Dec 9 10:23:53 2009 +0530
MAPREDUCE:754 from https://issues.apache.org/jira/secure/attachment/12427347/mapreduce-754-v2.2.1-yahoo.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. (Amar Kamat
+ via sharad)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077068&r1=1077067&r2=1077068&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:37:16 2011
@@ -401,16 +401,7 @@ public class JobTracker implements MRCon
// tracker has already been destroyed.
if (newProfile != null) {
if ((now - newProfile.getLastSeen()) > TASKTRACKER_EXPIRY_INTERVAL) {
- // Remove completely after marking the tasks as 'KILLED'
- lostTaskTracker(current);
- // tracker is lost, and if it is blacklisted, remove
- // it from the count of blacklisted trackers in the cluster
- if (isBlacklisted(trackerName)) {
- faultyTrackers.decrBlackListedTrackers(1);
- }
- updateTaskTrackerStatus(trackerName, null);
- statistics.taskTrackerRemoved(trackerName);
- getInstrumentation().decTrackers(1);
+ removeTracker(current);
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -877,17 +868,20 @@ public class JobTracker implements MRCon
private void removeHostCapacity(String hostName) {
synchronized (taskTrackers) {
// remove the capacity of trackers on this host
+ int numTrackersOnHost = 0;
for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
int reduceSlots = status.getMaxReduceSlots();
totalReduceTaskCapacity -= reduceSlots;
+ ++numTrackersOnHost;
getInstrumentation().addBlackListedMapSlots(
mapSlots);
getInstrumentation().addBlackListedReduceSlots(
reduceSlots);
}
- incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
+ uniqueHostsMap.remove(hostName);
+ incrBlackListedTrackers(numTrackersOnHost);
}
}
@@ -3095,12 +3089,14 @@ public class JobTracker implements MRCon
taskTrackers.remove(trackerName);
Integer numTaskTrackersInHost =
uniqueHostsMap.get(oldStatus.getHost());
- numTaskTrackersInHost --;
- if (numTaskTrackersInHost > 0) {
- uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
- }
- else {
- uniqueHostsMap.remove(oldStatus.getHost());
+ if (numTaskTrackersInHost != null) {
+ numTaskTrackersInHost --;
+ if (numTaskTrackersInHost > 0) {
+ uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
+ }
+ else {
+ uniqueHostsMap.remove(oldStatus.getHost());
+ }
}
}
}
@@ -4291,6 +4287,21 @@ public class JobTracker implements MRCon
decommissionNodes(excludeSet);
}
+ // Remove a tracker from the system
+ private void removeTracker(TaskTracker tracker) {
+ String trackerName = tracker.getTrackerName();
+ // Remove completely after marking the tasks as 'KILLED'
+ lostTaskTracker(tracker);
+ // tracker is lost, and if it is blacklisted, remove
+ // it from the count of blacklisted trackers in the cluster
+ if (isBlacklisted(trackerName)) {
+ faultyTrackers.decrBlackListedTrackers(1);
+ }
+ updateTaskTrackerStatus(trackerName, null);
+ statistics.taskTrackerRemoved(trackerName);
+ getInstrumentation().decTrackers(1);
+ }
+
// main decommission
synchronized void decommissionNodes(Set<String> hosts)
throws IOException {
@@ -4304,11 +4315,9 @@ public class JobTracker implements MRCon
Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
if (trackers != null) {
for (TaskTracker tracker : trackers) {
- LOG.info("Decommission: Losing tracker " + tracker +
+ LOG.info("Decommission: Losing tracker " + tracker.getTrackerName() +
" on host " + host);
- lostTaskTracker(tracker); // lose the tracker
- updateTaskTrackerStatus(
- tracker.getStatus().getTrackerName(), null);
+ removeTracker(tracker);
}
trackersDecommissioned += trackers.size();
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java?rev=1077068&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java Fri Mar 4 03:37:16 2011
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * Test node blacklisting. This testcase tests
+ * - node blacklisting along with node refresh
+ */
+public class TestNodeBlacklisting extends TestCase {
+ public static final Log LOG = LogFactory.getLog(TestNodeBlacklisting.class);
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data", "/tmp"), "node-bklisting");
+
+ // Mapper that fails once for the first time
+ static class FailOnceMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+ private boolean shouldFail = false;
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+
+ if (shouldFail) {
+ throw new RuntimeException("failing map");
+ }
+ }
+
+ @Override
+ public void configure(JobConf conf) {
+ TaskAttemptID id = TaskAttemptID.forName(conf.get("mapred.task.id"));
+ shouldFail = id.getId() == 0 && id.getTaskID().getId() == 0;
+ }
+ }
+
+ /**
+ * Check refreshNodes for decommissioning blacklisted nodes.
+ */
+ public void testBlacklistedNodeDecommissioning() throws Exception {
+ LOG.info("Testing blacklisted node decommissioning");
+ MiniMRCluster mr = null;
+ JobTracker jt = null;
+
+ try {
+ // start mini mr
+ JobConf jtConf = new JobConf();
+ jtConf.set("mapred.max.tracker.blacklists", "1");
+ mr = new MiniMRCluster(0, 0, 2, "file:///", 1, null, null, null, jtConf);
+ jt = mr.getJobTrackerRunner().getJobTracker();
+
+ assertEquals("Trackers not up", 2, jt.taskTrackers().size());
+ // validate the total tracker count
+ assertEquals("Active tracker count mismatch",
+ 2, jt.getClusterStatus(false).getTaskTrackers());
+ // validate blacklisted count
+ assertEquals("Blacklisted tracker count mismatch",
+ 0, jt.getClusterStatus(false).getBlacklistedTrackers());
+
+ // run a failing job to blacklist the tracker
+ JobConf jConf = mr.createJobConf();
+ jConf.set("mapred.max.tracker.failures", "1");
+ jConf.setJobName("test-job-fail-once");
+ jConf.setMapperClass(FailOnceMapper.class);
+ jConf.setReducerClass(IdentityReducer.class);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+
+ RunningJob job =
+ UtilsForTests.runJob(jConf, new Path(TEST_DIR, "in"),
+ new Path(TEST_DIR, "out"));
+ job.waitForCompletion();
+
+ // validate the total tracker count
+ assertEquals("Active tracker count mismatch",
+ 1, jt.getClusterStatus(false).getTaskTrackers());
+ // validate blacklisted count
+ assertEquals("Blacklisted tracker count mismatch",
+ 1, jt.getClusterStatus(false).getBlacklistedTrackers());
+
+ // find the blacklisted tracker
+ String trackerName = null;
+ for (TaskTrackerStatus status : jt.taskTrackers()) {
+ if (jt.isBlacklisted(status.getTrackerName())) {
+ trackerName = status.getTrackerName();
+ break;
+ }
+ }
+ // get the hostname
+ String hostToDecommission =
+ JobInProgress.convertTrackerNameToHostName(trackerName);
+ LOG.info("Decommissioning tracker " + hostToDecommission);
+
+ // decommission the node
+ HashSet<String> decom = new HashSet<String>(1);
+ decom.add(hostToDecommission);
+ jt.decommissionNodes(decom);
+
+ // validate
+ // check the cluster status and tracker size
+ assertEquals("Tracker is not lost upon host decommissioning",
+ 1, jt.getClusterStatus(false).getTaskTrackers());
+ assertEquals("Blacklisted tracker count incorrect in cluster status "
+ + "after decommissioning",
+ 0, jt.getClusterStatus(false).getBlacklistedTrackers());
+ assertEquals("Tracker is not lost upon host decommissioning",
+ 1, jt.taskTrackers().size());
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ mr = null;
+ jt = null;
+ FileUtil.fullyDelete(new File(TEST_DIR.toString()));
+ }
+ }
+ }
+}