You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/07/22 16:49:00 UTC
svn commit: r796751 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/server/jobtracker/
src/test/mapred/org/apache/hadoop/mapred/
Author: yhemanth
Date: Wed Jul 22 14:49:00 2009
New Revision: 796751
URL: http://svn.apache.org/viewvc?rev=796751&view=rev
Log:
MAPREDUCE-682. Removes reservations on tasktrackers which are blacklisted. Contributed by Sreekanth Ramakrishnan.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 14:49:00 2009
@@ -250,3 +250,7 @@
MAPREDUCE-735. Fixes a problem in the KeyFieldHelper to do with
the end index for some inputs (Amar Kamat via ddas)
+
+ MAPREDUCE-682. Removes reservations on tasktrackers which are
+ blacklisted. (Sreekanth Ramakrishnan via yhemanth)
+
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jul 22 14:49:00 2009
@@ -629,6 +629,13 @@
} else {
LOG.info("Blacklisting tracker : " + hostName
+ " Reason for blacklisting is : " + rfb);
+ Set<TaskTracker> trackers =
+ hostnameToTaskTracker.get(hostName);
+ synchronized (trackers) {
+ for (TaskTracker tracker : trackers) {
+ tracker.cancelAllReservations();
+ }
+ }
removeHostCapacity(hostName);
fi.setBlacklist(rfb, reason);
}
@@ -3999,7 +4006,7 @@
}
// Cleanup
- taskTracker.lost();
+ taskTracker.cancelAllReservations();
// Purge 'marked' tasks, needs to be done
// here to prevent hanging references!
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java Wed Jul 22 14:49:00 2009
@@ -183,10 +183,13 @@
}
/**
- * Cleanup when the {@link TaskTracker} is declared as 'lost' by the
- * JobTracker.
+ * Cleanup when the {@link TaskTracker} is declared as 'lost/blacklisted'
+ * by the JobTracker.
+ *
+ * The method assumes that the lock on the {@link JobTracker} is obtained
+ * by the caller.
*/
- public void lost() {
+ public void cancelAllReservations() {
// Inform jobs which have reserved slots on this tasktracker
if (jobForFallowMapSlot != null) {
unreserveSlots(TaskType.MAP, jobForFallowMapSlot);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Wed Jul 22 14:49:00 2009
@@ -33,6 +33,8 @@
import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
public class TestTaskTrackerBlacklisting extends TestCase {
@@ -182,14 +184,15 @@
sendHeartBeat(null, false);
assertEquals("Tracker 1 still blacklisted after a day", jobTracker
.getBlacklistedTrackerCount(), 0);
+ //Cleanup the blacklisted trackers.
+ //Tracker is black listed due to failure count, so clock has to be
+ //forwarded by a day.
clock.jumpADay = false;
}
public void testNodeHealthBlackListing() throws Exception {
- TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
- status.setNodeHealthy(false);
- status.setLastReported(System.currentTimeMillis());
- status.setHealthReport("ERROR");
+ TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+ //Blacklist tracker due to node health failures.
sendHeartBeat(status, false);
for (String host : hosts) {
checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
@@ -197,6 +200,8 @@
status.setNodeHealthy(true);
status.setLastReported(System.currentTimeMillis());
status.setHealthReport("");
+ //white list tracker so the further test cases can be
+ //using trackers.
sendHeartBeat(status, false);
assertEquals("Trackers still blacklisted after healthy report", jobTracker
.getBlacklistedTrackerCount(), 0);
@@ -207,10 +212,8 @@
assertEquals("Tracker 1 not blacklisted", jobTracker
.getBlacklistedTrackerCount(), 1);
checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
- TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
- status.setNodeHealthy(false);
- status.setLastReported(System.currentTimeMillis());
- status.setHealthReport("ERROR");
+ TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted",
@@ -228,21 +231,19 @@
for (String host : hosts) {
checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
}
+ //clear blacklisted trackers due to node health reasons.
sendHeartBeat(null, false);
assertEquals("All trackers not white listed",
jobTracker.getBlacklistedTrackerCount(), 0);
-
+ //Clear the blacklisted trackers due to failures.
clock.jumpADay = false;
}
public void testBlacklistingReasonString() throws Exception {
String error = "ERROR";
String error1 = "ERROR1";
- TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
- status.setNodeHealthy(false);
- status.setLastReported(System.currentTimeMillis());
- status.setHealthReport(error);
+ TaskTrackerHealthStatus status = getUnhealthyNodeStatus(error);
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", jobTracker
@@ -272,6 +273,58 @@
jobTracker.getReasonsForBlacklisting(hosts[i]).replace("\n", ""),
error1);
}
+ //clear the blacklisted trackers with node health reasons.
+ sendHeartBeat(null, false);
+ }
+
+ private TaskTrackerHealthStatus getUnhealthyNodeStatus(String error) {
+ TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
+ status.setNodeHealthy(false);
+ status.setLastReported(System.currentTimeMillis());
+ status.setHealthReport(error);
+ return status;
+ }
+
+ public void testBlackListingWithTrackerReservation() throws Exception {
+ JobConf conf = new JobConf();
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+ TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+ TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+ tt1.reserveSlots(TaskType.MAP, job, 1);
+ tt1.reserveSlots(TaskType.REDUCE, job, 1);
+ tt2.reserveSlots(TaskType.MAP, job, 1);
+ tt2.reserveSlots(TaskType.REDUCE, job, 1);
+ assertEquals("Tracker 1 not reserved for the job 1", 2, job
+ .getNumReservedTaskTrackersForMaps());
+ assertEquals("Tracker 1 not reserved for the job 1", 2, job
+ .getNumReservedTaskTrackersForReduces());
+ runBlackListingJob(jobTracker, trackers);
+ assertEquals("Tracker 1 not unreserved for the job 1", 1, job
+ .getNumReservedTaskTrackersForMaps());
+ assertEquals("Tracker 1 not unreserved for the job 1", 1, job
+ .getNumReservedTaskTrackersForReduces());
+ assertEquals("Tracker 1 not blacklisted", jobTracker
+ .getBlacklistedTrackerCount(), 1);
+ checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
+
+ TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+ sendHeartBeat(status, false);
+ assertEquals("All trackers not blacklisted", jobTracker
+ .getBlacklistedTrackerCount(), 3);
+
+ checkReasonForBlackListing(hosts[0], unhealthyAndExceedsFailure);
+ checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
+ checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
+
+ assertEquals("Tracker 1 not unreserved for the job 1", 0, job
+ .getNumReservedTaskTrackersForMaps());
+ assertEquals("Tracker 1 not unreserved for the job 1", 0, job
+ .getNumReservedTaskTrackersForReduces());
+ //white list all trackers for health reasons
+ sendHeartBeat(null, false);
+ clock.jumpADay = false;
}
/**