You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:33:31 UTC
svn commit: r1077596 - in
/hadoop/common/branches/branch-0.20-security-patches/src: mapred/
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/
test/org/apache/hadoop/mapred/ webapps/job/
Author: omalley
Date: Fri Mar 4 04:33:31 2011
New Revision: 1077596
URL: http://svn.apache.org/viewvc?rev=1077596&view=rev
Log:
commit 6939f6854b330a01cc4427f4c657df0c3c4d53ab
Author: Arun C Murthy <ac...@apache.org>
Date: Fri Jul 23 15:39:49 2010 -0700
MAPREDUCE-1966. Change blacklisting of tasktrackers on task failures to be a simple graylist to fingerpoint bad tasktrackers. Contributed by Greg Roelofs.
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1966. Change blacklisting of tasktrackers on task failures to be
+ a simple graylist to fingerpoint bad tasktrackers. (Greg Roelofs via
+ acmurthy)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 04:33:31 2011
@@ -678,11 +678,41 @@
<property>
<name>mapred.max.tracker.blacklists</name>
<value>4</value>
- <description>The number of blacklists for a taskTracker by various jobs
- after which the task tracker could be blacklisted across
- all jobs. The tracker will be given a tasks later
- (after a day). The tracker will become a healthy
- tracker after a restart.
+ <description>The number of blacklists for a tasktracker by various jobs
+ after which the tasktracker will be marked as potentially
+ faulty and is a candidate for graylisting across all jobs.
+ (Unlike blacklisting, this is advisory; the tracker remains
+ active. However, it is reported as graylisted in the web UI,
+ with the expectation that chronically graylisted trackers
+ will be manually decommissioned.) This value is tied to
+ mapred.jobtracker.blacklist.fault-timeout-window; faults
+ older than the window width are forgiven, so the tracker
+ will recover from transient problems. It will also become
+ healthy after a restart.
+ </description>
+</property>
+
+<property>
+ <name>mapred.jobtracker.blacklist.fault-timeout-window</name>
+ <value>180</value>
+ <description>The timeout (in minutes) after which per-job tasktracker
+ faults are forgiven. The window is logically a circular
+ buffer of time-interval buckets whose width is defined by
+ mapred.jobtracker.blacklist.fault-bucket-width; when the
+ "now" pointer moves across a bucket boundary, the previous
+ contents (faults) of the new bucket are cleared. In other
+ words, the timeout's granularity is determined by the bucket
+ width.
+ </description>
+</property>
+
+<property>
+ <name>mapred.jobtracker.blacklist.fault-bucket-width</name>
+ <value>15</value>
+ <description>The width (in minutes) of each bucket in the tasktracker
+ fault timeout window. Each bucket is reused in a circular
+ manner after a full timeout-window interval (defined by
+ mapred.jobtracker.blacklist.fault-timeout-window).
</description>
</property>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Fri Mar 4 04:33:31 2011
@@ -60,7 +60,9 @@ public class ClusterStatus implements Wr
private int numActiveTrackers;
private Collection<String> activeTrackers = new ArrayList<String>();
private Collection<String> blacklistedTrackers = new ArrayList<String>();
+ private Collection<String> graylistedTrackers = new ArrayList<String>();
private int numBlacklistedTrackers;
+ private int numGraylistedTrackers;
private int numExcludedNodes;
private long ttExpiryInterval;
private int map_tasks;
@@ -78,7 +80,7 @@ public class ClusterStatus implements Wr
/**
* Construct a new cluster status.
*
- * @param trackers no. of tasktrackers in the cluster
+ * @param trackers no. of active tasktrackers in the cluster
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
* @param maxMaps the maximum no. of map tasks in the cluster
@@ -89,15 +91,16 @@ public class ClusterStatus implements Wr
@Deprecated
ClusterStatus(int trackers, int maps, int reduces, int maxMaps,
int maxReduces, JobTracker.State state) {
- this(trackers, 0, JobTracker.TASKTRACKER_EXPIRY_INTERVAL, maps, reduces,
+ this(trackers, 0, 0, JobTracker.TASKTRACKER_EXPIRY_INTERVAL, maps, reduces,
maxMaps, maxReduces, state);
}
/**
* Construct a new cluster status.
*
- * @param trackers no. of tasktrackers in the cluster
- * @param blacklists no of blacklisted task trackers in the cluster
+ * @param trackers no. of active tasktrackers in the cluster (includes gray)
+ * @param blacklists no. of blacklisted task trackers in the cluster
+ * @param graylists no. of graylisted task trackers in the cluster
* @param ttExpiryInterval the tasktracker expiry interval
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
@@ -105,17 +108,19 @@ public class ClusterStatus implements Wr
* @param maxReduces the maximum no. of reduce tasks in the cluster
* @param state the {@link JobTracker.State} of the <code>JobTracker</code>
*/
- ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
- int maps, int reduces,
+ ClusterStatus(int trackers, int blacklists, int graylists,
+ long ttExpiryInterval, int maps, int reduces,
int maxMaps, int maxReduces, JobTracker.State state) {
- this(trackers, blacklists, ttExpiryInterval, maps, reduces, maxMaps,
- maxReduces, state, 0);
+ this(trackers, blacklists, graylists, ttExpiryInterval, maps, reduces,
+ maxMaps, maxReduces, state, 0);
}
/**
* Construct a new cluster status.
- * @param trackers no. of tasktrackers in the cluster
- * @param blacklists no of blacklisted task trackers in the cluster
+ *
+ * @param trackers no. of active tasktrackers in the cluster (includes gray)
+ * @param blacklists no. of blacklisted task trackers in the cluster
+ * @param graylists no. of graylisted task trackers in the cluster
* @param ttExpiryInterval the tasktracker expiry interval
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
@@ -124,10 +129,11 @@ public class ClusterStatus implements Wr
* @param state the {@link JobTracker.State} of the <code>JobTracker</code>
* @param numDecommissionedNodes number of decommission trackers
*/
- ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
+ ClusterStatus(int trackers, int blacklists, int graylists,
+ long ttExpiryInterval,
int maps, int reduces, int maxMaps, int maxReduces,
JobTracker.State state, int numDecommissionedNodes) {
- this(trackers, blacklists, ttExpiryInterval, maps, reduces,
+ this(trackers, blacklists, graylists, ttExpiryInterval, maps, reduces,
maxMaps, maxReduces, state, numDecommissionedNodes,
UNINITIALIZED_MEMORY_VALUE, UNINITIALIZED_MEMORY_VALUE);
}
@@ -135,8 +141,9 @@ public class ClusterStatus implements Wr
/**
* Construct a new cluster status.
*
- * @param activeTrackers active tasktrackers in the cluster
+ * @param activeTrackers active tasktrackers in the cluster (includes gray)
* @param blacklistedTrackers blacklisted tasktrackers in the cluster
+ * @param graylistedTrackers graylisted tasktrackers in the cluster
* @param ttExpiryInterval the tasktracker expiry interval
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
@@ -144,21 +151,23 @@ public class ClusterStatus implements Wr
* @param maxReduces the maximum no. of reduce tasks in the cluster
* @param state the {@link JobTracker.State} of the <code>JobTracker</code>
*/
- ClusterStatus(Collection<String> activeTrackers,
- Collection<String> blacklistedTrackers,
- long ttExpiryInterval,
- int maps, int reduces, int maxMaps, int maxReduces,
- JobTracker.State state) {
- this(activeTrackers, blacklistedTrackers, ttExpiryInterval, maps, reduces,
- maxMaps, maxReduces, state, 0);
+ ClusterStatus(Collection<String> activeTrackers,
+ Collection<String> blacklistedTrackers,
+ Collection<String> graylistedTrackers,
+ long ttExpiryInterval,
+ int maps, int reduces, int maxMaps, int maxReduces,
+ JobTracker.State state) {
+ this(activeTrackers, blacklistedTrackers, graylistedTrackers,
+ ttExpiryInterval, maps, reduces, maxMaps, maxReduces, state, 0);
}
- ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
- int maps, int reduces, int maxMaps, int maxReduces,
- JobTracker.State state, int numDecommissionedNodes,
- long used_memory, long max_memory) {
+ ClusterStatus(int trackers, int blacklists, int graylists,
+ long ttExpiryInterval, int maps, int reduces, int maxMaps,
+ int maxReduces, JobTracker.State state,
+ int numDecommissionedNodes, long used_memory, long max_memory) {
numActiveTrackers = trackers;
numBlacklistedTrackers = blacklists;
+ numGraylistedTrackers = graylists;
this.numExcludedNodes = numDecommissionedNodes;
this.ttExpiryInterval = ttExpiryInterval;
map_tasks = maps;
@@ -172,8 +181,9 @@ public class ClusterStatus implements Wr
/**
* Construct a new cluster status.
- * @param activeTrackers active tasktrackers in the cluster
+ * @param activeTrackers active tasktrackers in the cluster (includes gray)
* @param blacklistedTrackers blacklisted tasktrackers in the cluster
+ * @param graylistedTrackers graylisted tasktrackers in the cluster
* @param ttExpiryInterval the tasktracker expiry interval
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
@@ -183,28 +193,33 @@ public class ClusterStatus implements Wr
* @param numDecommissionNodes number of decommission trackers
*/
ClusterStatus(Collection<String> activeTrackers,
- Collection<String> blacklistedTrackers, long ttExpiryInterval,
+ Collection<String> blacklistedTrackers,
+ Collection<String> graylistedTrackers, long ttExpiryInterval,
int maps, int reduces, int maxMaps, int maxReduces,
JobTracker.State state, int numDecommissionNodes) {
- this(activeTrackers.size(), blacklistedTrackers.size(), ttExpiryInterval,
- maps, reduces, maxMaps, maxReduces, state, numDecommissionNodes,
+ this(activeTrackers.size(), blacklistedTrackers.size(),
+ graylistedTrackers.size(), ttExpiryInterval, maps, reduces,
+ maxMaps, maxReduces, state, numDecommissionNodes,
Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
this.activeTrackers = activeTrackers;
this.blacklistedTrackers = blacklistedTrackers;
+ this.graylistedTrackers = graylistedTrackers;
}
/**
- * Get the number of task trackers in the cluster.
- *
- * @return the number of task trackers in the cluster.
+ * Get the number of active task trackers in the cluster. Includes
+ * graylisted but not blacklisted trackers.
+ *
+ * @return the number of active task trackers in the cluster.
*/
public int getTaskTrackers() {
return numActiveTrackers;
}
/**
- * Get the names of task trackers in the cluster.
- *
+ * Get the names of active task trackers in the cluster. Includes
+ * graylisted but not blacklisted trackers.
+ *
* @return the active task trackers in the cluster.
*/
public Collection<String> getActiveTrackerNames() {
@@ -212,14 +227,14 @@ public class ClusterStatus implements Wr
}
/**
- * Get the names of task trackers in the cluster.
+ * Get the names of blacklisted task trackers in the cluster.
*
* @return the blacklisted task trackers in the cluster.
*/
public Collection<String> getBlacklistedTrackerNames() {
return blacklistedTrackers;
}
-
+
/**
* Get the number of blacklisted task trackers in the cluster.
*
@@ -228,7 +243,25 @@ public class ClusterStatus implements Wr
public int getBlacklistedTrackers() {
return numBlacklistedTrackers;
}
-
+
+ /**
+ * Get the names of graylisted task trackers in the cluster.
+ *
+ * @return the graylisted task trackers in the cluster.
+ */
+ public Collection<String> getGraylistedTrackerNames() {
+ return graylistedTrackers;
+ }
+
+ /**
+ * Get the number of graylisted task trackers in the cluster.
+ *
+ * @return the number of graylisted task trackers in the cluster.
+ */
+ public int getGraylistedTrackers() {
+ return numGraylistedTrackers;
+ }
+
/**
* Get the number of excluded hosts in the cluster.
* @return the number of excluded hosts in the cluster.
@@ -330,6 +363,16 @@ public class ClusterStatus implements Wr
Text.writeString(out, tracker);
}
}
+ if (graylistedTrackers.size() == 0) {
+ out.writeInt(numGraylistedTrackers);
+ out.writeInt(0);
+ } else {
+ out.writeInt(graylistedTrackers.size());
+ out.writeInt(graylistedTrackers.size());
+ for (String tracker : graylistedTrackers) {
+ Text.writeString(out, tracker);
+ }
+ }
out.writeInt(numExcludedNodes);
out.writeLong(ttExpiryInterval);
out.writeInt(map_tasks);
@@ -358,6 +401,14 @@ public class ClusterStatus implements Wr
blacklistedTrackers.add(name);
}
}
+ numGraylistedTrackers = in.readInt();
+ numTrackerNames = in.readInt();
+ if (numTrackerNames > 0) {
+ for (int i = 0; i < numTrackerNames; i++) {
+ String name = Text.readString(in);
+ graylistedTrackers.add(name);
+ }
+ }
numExcludedNodes = in.readInt();
ttExpiryInterval = in.readLong();
map_tasks = in.readInt();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:33:31 2011
@@ -132,27 +132,42 @@ public class JobTracker implements MRCon
private final long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
private final DelegationTokenSecretManager secretManager;
-
- // The interval after which one fault of a tracker will be discarded,
- // if there are no faults during this.
- private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
- // The maximum percentage of trackers in cluster added
- // to the 'blacklist' across all the jobs.
- private static double MAX_BLACKLIST_PERCENT = 0.50;
- // A tracker is blacklisted across jobs only if number of
- // blacklists are X% above the average number of blacklists.
- // X is the blacklist threshold here.
- private double AVERAGE_BLACKLIST_THRESHOLD = 0.50;
- // The maximum number of blacklists for a tracker after which the
- // tracker could be blacklisted across all jobs
- private int MAX_BLACKLISTS_PER_TRACKER = 4;
+ // The maximum fraction (range [0.0-1.0]) of nodes in cluster allowed to be
+ // added to the all-jobs blacklist via heuristics. By default, no more than
+ // 50% of the cluster can be heuristically blacklisted, but the external
+ // node-healthcheck script is not affected by this.
+ private static double MAX_BLACKLIST_FRACTION = 0.5;
+
+ // A tracker is blacklisted across jobs only if number of faults is more
+ // than X% above the average number of faults (averaged across all nodes
+ // in cluster). X is the blacklist threshold here; 0.3 would correspond
+ // to 130% of the average, for example.
+ private double AVERAGE_BLACKLIST_THRESHOLD = 0.5;
+
+ // Fault threshold (number occurring within TRACKER_FAULT_TIMEOUT_WINDOW)
+ // to consider a task tracker bad enough to blacklist heuristically. This
+ // is functionally the same as the older "MAX_BLACKLISTS_PER_TRACKER" value.
+ private int TRACKER_FAULT_THRESHOLD; // = 4;
+
+ // Width of overall fault-tracking sliding window (in minutes). (Default
+ // of 24 hours matches previous "UPDATE_FAULTY_TRACKER_INTERVAL" value that
+ // was used to forgive a single fault if no others occurred in the interval.)
+ private int TRACKER_FAULT_TIMEOUT_WINDOW; // = 180 (3 hours)
+
+ // Width of a single fault-tracking bucket (in minutes).
+ private int TRACKER_FAULT_BUCKET_WIDTH; // = 15
+ private long TRACKER_FAULT_BUCKET_WIDTH_MSECS;
+
+ // Basically TRACKER_FAULT_TIMEOUT_WINDOW / TRACKER_FAULT_BUCKET_WIDTH .
+ private int NUM_FAULT_BUCKETS;
+
/** the maximum allowed size of the jobconf **/
long MAX_JOBCONF_SIZE = 5*1024*1024L;
/** the config key for max user jobconf size **/
public static final String MAX_USER_JOBCONF_SIZE_KEY =
"mapred.user.jobconf.limit";
-
- //Delegation token related keys
+
+ // Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
"mapreduce.cluster.delegation.key.update-interval";
public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
@@ -440,7 +455,7 @@ public class JobTracker implements MRCon
// JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
// Also need to lock JobTracker before locking 'taskTracker' &
// 'trackerExpiryQueue' to prevent deadlock:
- // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)}
+ // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean, long)}
synchronized (JobTracker.this) {
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
@@ -655,85 +670,142 @@ public class JobTracker implements MRCon
EXCEEDING_FAILURES,
NODE_UNHEALTHY
}
-
- // The FaultInfo which indicates the number of faults of a tracker
- // and when the last fault occurred
- // and whether the tracker is blacklisted across all jobs or not
+
+ // FaultInfo: data structure that tracks the number of faults of a single
+ // TaskTracker, when the last fault occurred, and whether the TaskTracker
+ // is blacklisted across all jobs or not.
private static class FaultInfo {
static final String FAULT_FORMAT_STRING = "%d failures on the tracker";
- int numFaults = 0;
- long lastUpdated;
+ int[] numFaults; // timeslice buckets
+ long lastRotated; // 1st millisecond of current bucket
boolean blacklisted;
-
+ boolean graylisted;
+
+ private int numFaultBuckets;
+ private long bucketWidth;
private boolean isHealthy;
- private HashMap<ReasonForBlackListing, String>rfbMap;
-
- FaultInfo(long time) {
- numFaults = 0;
- lastUpdated = time;
+ private HashMap<ReasonForBlackListing, String> blackRfbMap;
+ private HashMap<ReasonForBlackListing, String> grayRfbMap;
+
+ FaultInfo(long time, int numFaultBuckets, long bucketWidth) {
+ this.numFaultBuckets = numFaultBuckets;
+ this.bucketWidth = bucketWidth;
+ numFaults = new int[numFaultBuckets];
+ lastRotated = (time / bucketWidth) * bucketWidth;
blacklisted = false;
- rfbMap = new HashMap<ReasonForBlackListing, String>();
+ graylisted = false;
+ isHealthy = true;
+ blackRfbMap = new HashMap<ReasonForBlackListing, String>();
+ grayRfbMap = new HashMap<ReasonForBlackListing, String>();
+ }
+
+ // timeStamp is presumed to be "now": there are no checks for past or
+ // future values, etc.
+ private void checkRotation(long timeStamp) {
+ long diff = timeStamp - lastRotated;
+ // find index of the oldest bucket(s) and zero it (or them) out
+ while (diff > bucketWidth) {
+ // this is now the 1st millisecond of the oldest bucket, in a modular-
+ // arithmetic sense (i.e., about to become the newest bucket):
+ lastRotated += bucketWidth;
+ // corresponding bucket index:
+ int idx = (int)((lastRotated / bucketWidth) % numFaultBuckets);
+ // clear the bucket's contents in preparation for new faults
+ numFaults[idx] = 0;
+ diff -= bucketWidth;
+ }
}
- void setFaultCount(int num) {
- numFaults = num;
+ private int bucketIndex(long timeStamp) {
+ // stupid Java compiler thinks an int modulus can produce a long, sigh...
+ return (int)((timeStamp / bucketWidth) % numFaultBuckets);
+ }
+
+ // no longer any need for corresponding decrFaultCount() method since we
+ // implicitly auto-decrement when oldest bucket's contents get wiped on
+ // rotation
+ void incrFaultCount(long timeStamp) {
+ checkRotation(timeStamp);
+ ++numFaults[bucketIndex(timeStamp)];
+ }
+
+ int getFaultCount(long timeStamp) {
+ checkRotation(timeStamp);
+ int faultCount = 0;
+ for (int faults : numFaults) {
+ faultCount += faults;
+ }
+ return faultCount;
}
- void setLastUpdated(long timeStamp) {
- lastUpdated = timeStamp;
+ boolean isBlacklisted() {
+ return blacklisted;
}
- int getFaultCount() {
- return numFaults;
+ boolean isGraylisted() {
+ return graylisted;
}
- long getLastUpdated() {
- return lastUpdated;
- }
-
- boolean isBlacklisted() {
- return blacklisted;
- }
-
- void setBlacklist(ReasonForBlackListing rfb,
- String trackerFaultReport) {
- blacklisted = true;
- this.rfbMap.put(rfb, trackerFaultReport);
+ void setBlacklist(ReasonForBlackListing rfb, String trackerFaultReport,
+ boolean gray) {
+ if (gray) {
+ graylisted = true;
+ this.grayRfbMap.put(rfb, trackerFaultReport);
+ } else {
+ blacklisted = true;
+ this.blackRfbMap.put(rfb, trackerFaultReport);
+ }
}
- public void setHealthy(boolean isHealthy) {
+ public void setHealth(boolean isHealthy) {
this.isHealthy = isHealthy;
}
public boolean isHealthy() {
return isHealthy;
}
-
- public String getTrackerFaultReport() {
+
+ public String getTrackerBlackOrGraylistReport(boolean gray) {
StringBuffer sb = new StringBuffer();
- for(String reasons : rfbMap.values()) {
+ HashMap<ReasonForBlackListing, String> rfbMap =
+ new HashMap<ReasonForBlackListing, String>();
+ rfbMap.putAll(gray? grayRfbMap : blackRfbMap);
+ for (String reasons : rfbMap.values()) {
sb.append(reasons);
sb.append("\n");
}
return sb.toString();
}
-
- Set<ReasonForBlackListing> getReasonforblacklisting() {
- return this.rfbMap.keySet();
+
+ Set<ReasonForBlackListing> getReasonForBlacklisting(boolean gray) {
+ return (gray? this.grayRfbMap.keySet() : this.blackRfbMap.keySet());
}
-
- public void unBlacklist() {
- this.blacklisted = false;
- this.rfbMap.clear();
+
+ // no longer on the blacklist (or graylist), but we're still tracking any
+ // faults in case issue is intermittent => don't clear numFaults[]
+ public void unBlacklist(boolean gray) {
+ if (gray) {
+ graylisted = false;
+ grayRfbMap.clear();
+ } else {
+ blacklisted = false;
+ blackRfbMap.clear();
+ }
}
- public boolean removeBlackListedReason(ReasonForBlackListing rfb) {
- String str = rfbMap.remove(rfb);
+ public boolean removeBlacklistedReason(ReasonForBlackListing rfb,
+ boolean gray) {
+ String str = (gray? grayRfbMap.remove(rfb) : blackRfbMap.remove(rfb));
return str!=null;
}
- public void addBlackListedReason(ReasonForBlackListing rfb, String reason) {
- this.rfbMap.put(rfb, reason);
+ public void addBlacklistedReason(ReasonForBlackListing rfb,
+ String reason, boolean gray) {
+ if (gray) {
+ grayRfbMap.put(rfb, reason);
+ } else {
+ blackRfbMap.put(rfb, reason);
+ }
}
}
@@ -748,6 +820,7 @@ public class JobTracker implements MRCon
// this count doesn't include blacklisted trackers which are lost,
// although the fault info is maintained for lost trackers.
private volatile int numBlacklistedTrackers = 0;
+ private volatile int numGraylistedTrackers = 0;
/**
* Increments faults(blacklist by job) for the tracker by one.
@@ -759,136 +832,193 @@ public class JobTracker implements MRCon
*/
void incrementFaults(String hostName) {
synchronized (potentiallyFaultyTrackers) {
+ long now = clock.getTime();
FaultInfo fi = getFaultInfo(hostName, true);
- int numFaults = fi.getFaultCount();
- ++numFaults;
- fi.setFaultCount(numFaults);
- fi.setLastUpdated(clock.getTime());
- if (exceedsFaults(fi)) {
- LOG.info("Adding " + hostName + " to the blacklist"
- + " across all jobs");
+ fi.incrFaultCount(now);
+ // check heuristics, and add either to blacklist (original behavior)
+ // or to graylist (new, optional behavior):
+ if (exceedsFaults(fi, now)) {
+ LOG.info("Adding " + hostName + " to the graylist across all jobs");
String reason = String.format(FaultInfo.FAULT_FORMAT_STRING,
- numFaults);
- blackListTracker(hostName, reason,
- ReasonForBlackListing.EXCEEDING_FAILURES);
+ fi.getFaultCount(now));
+ blacklistTracker(hostName, reason,
+ ReasonForBlackListing.EXCEEDING_FAILURES, true);
}
}
}
- private void incrBlackListedTrackers(int count) {
+ /**
+ * Graylists the tracker across all jobs (similar to blacklisting except
+ * not actually removed from service) if all of the following heuristics
+ * hold:
+ * <ol>
+ * <li>number of faults within TRACKER_FAULT_TIMEOUT_WINDOW is greater
+ * than or equal to TRACKER_FAULT_THRESHOLD (per-job blacklistings)
+ * (both configurable)</li>
+ * <li>number of faults (per-job blacklistings) for given node is more
+ * than (1 + AVERAGE_BLACKLIST_THRESHOLD) times the average number
+ * of faults across all nodes (configurable)</li>
+ * <li>less than 50% of the cluster is blacklisted (NOT configurable)</li>
+ * </ol>
+ * Note that the node health-check script is not explicitly limited by
+ * the 50%-blacklist limit.
+ */
+ // this is the sole source of "heuristic blacklisting" == graylisting
+ private boolean exceedsFaults(FaultInfo fi, long timeStamp) {
+ int faultCount = fi.getFaultCount(timeStamp);
+ if (faultCount >= TRACKER_FAULT_THRESHOLD) {
+ // calculate avgBlackLists
+ long clusterSize = getClusterStatus().getTaskTrackers();
+ long sum = 0;
+ for (FaultInfo f : potentiallyFaultyTrackers.values()) {
+ sum += f.getFaultCount(timeStamp);
+ }
+ double avg = (double) sum / clusterSize; // avg num faults per node
+ // graylisted trackers are already included in clusterSize:
+ long totalCluster = clusterSize + numBlacklistedTrackers;
+ if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
+ numGraylistedTrackers < (totalCluster * MAX_BLACKLIST_FRACTION)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void incrBlacklistedTrackers(int count) {
numBlacklistedTrackers += count;
getInstrumentation().addBlackListedTrackers(count);
}
- private void decrBlackListedTrackers(int count) {
+ private void decrBlacklistedTrackers(int count) {
numBlacklistedTrackers -= count;
getInstrumentation().decBlackListedTrackers(count);
}
- private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
+ private void incrGraylistedTrackers(int count) {
+ numGraylistedTrackers += count;
+ getInstrumentation().addGrayListedTrackers(count);
+ }
+
+ private void decrGraylistedTrackers(int count) {
+ numGraylistedTrackers -= count;
+ getInstrumentation().decGrayListedTrackers(count);
+ }
+
+ // This may be called either as a result of the node health-check script
+ // or because of heuristics based on single-job blacklist info.
+ private void blacklistTracker(String hostName, String reason,
+ ReasonForBlackListing rfb,
+ boolean gray) {
FaultInfo fi = getFaultInfo(hostName, true);
- boolean blackListed = fi.isBlacklisted();
- if(blackListed) {
+ String shade = gray? "gray" : "black";
+ boolean listed = gray? fi.isGraylisted() : fi.isBlacklisted();
+ if (listed) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding blacklisted reason for tracker : " + hostName
- + " Reason for blacklisting is : " + rfb);
+ LOG.debug("Adding/overwriting reason for " + shade +
+ "listed tracker : " + hostName + " Reason for " + shade +
+ "listing is : " + rfb);
+ }
+ if (!fi.getReasonForBlacklisting(gray).contains(rfb)) {
+ LOG.info("Adding new reason for " + shade + "listed tracker : " +
+ hostName + " Reason for " + shade + "listing is : " + rfb);
}
- if (!fi.getReasonforblacklisting().contains(rfb)) {
- LOG.info("Adding blacklisted reason for tracker : " + hostName
- + " Reason for blacklisting is : " + rfb);
- }
- fi.addBlackListedReason(rfb, reason);
+ fi.addBlacklistedReason(rfb, reason, gray);
} else {
- LOG.info("Blacklisting tracker : " + hostName
- + " Reason for blacklisting is : " + rfb);
- Set<TaskTracker> trackers =
- hostnameToTaskTracker.get(hostName);
- synchronized (trackers) {
- for (TaskTracker tracker : trackers) {
- tracker.cancelAllReservations();
+ LOG.info("Adding new " + shade + "listed tracker : " + hostName
+ + " Reason for " + shade + "listing is : " + rfb);
+ if (gray) {
+ incrGraylistedTrackers(getNumTaskTrackersOnHost(hostName));
+ } else {
+ Set<TaskTracker> trackers =
+ hostnameToTaskTracker.get(hostName);
+ synchronized (trackers) {
+ for (TaskTracker tracker : trackers) {
+ tracker.cancelAllReservations();
+ }
}
+ removeHostCapacity(hostName);
}
- removeHostCapacity(hostName);
- fi.setBlacklist(rfb, reason);
+ fi.setBlacklist(rfb, reason, gray);
}
}
-
- private boolean canUnBlackListTracker(String hostName,
- ReasonForBlackListing rfb) {
- FaultInfo fi = getFaultInfo(hostName, false);
- if(fi == null) {
- return false;
+
+ /**
+ * Check whether tasks can be assigned to the tracker.
+ *
+ * Faults are stored in a multi-bucket, circular sliding window; when
+ * the implicit "time pointer" moves across a bucket boundary into the
+ * oldest bucket, that bucket's faults are cleared, and it becomes the
+ * newest ("current") bucket. Thus TRACKER_FAULT_TIMEOUT_WINDOW
+ * determines the timeout value for TaskTracker faults (in combination
+ * with TRACKER_FAULT_BUCKET_WIDTH), and the sum over all buckets is
+ * compared with TRACKER_FAULT_THRESHOLD to determine whether global
+ * blacklisting is warranted (or, alternatively, if it should be lifted).
+ *
+ * Assumes JobTracker is locked on entry.
+ *
+ * @param hostName The tracker name
+ * @param now The current time (milliseconds)
+ */
+ void checkTrackerFaultTimeout(String hostName, long now) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+ // getFaultCount() auto-rotates the buckets, clearing out the oldest
+ // as needed, before summing the faults:
+ if (fi != null && fi.getFaultCount(now) < TRACKER_FAULT_THRESHOLD) {
+ unBlacklistTracker(hostName, ReasonForBlackListing.EXCEEDING_FAILURES,
+ true, now);
+ }
}
-
- Set<ReasonForBlackListing> rfbSet = fi.getReasonforblacklisting();
- return fi.isBlacklisted() && rfbSet.contains(rfb);
}
- private void unBlackListTracker(String hostName,
- ReasonForBlackListing rfb) {
- // check if you can black list the tracker then call this methods
+ private void unBlacklistTracker(String hostName,
+ ReasonForBlackListing rfb,
+ boolean gray,
+ long timeStamp) {
FaultInfo fi = getFaultInfo(hostName, false);
- if(fi.removeBlackListedReason(rfb)) {
- if(fi.getReasonforblacklisting().isEmpty()) {
- addHostCapacity(hostName);
- LOG.info("Unblacklisting tracker : " + hostName);
- fi.unBlacklist();
- //We have unBlackListed tracker, so tracker should
- //definitely be healthy. Check fault count if fault count
- //is zero don't keep it memory.
- if(fi.numFaults == 0) {
- potentiallyFaultyTrackers.remove(hostName);
+ if (fi == null) {
+ return;
+ }
+ Set<ReasonForBlackListing> rfbSet = fi.getReasonForBlacklisting(gray);
+ boolean listed = gray? fi.isGraylisted() : fi.isBlacklisted();
+ if (listed && rfbSet.contains(rfb)) {
+ if (fi.removeBlacklistedReason(rfb, gray)) {
+ if (fi.getReasonForBlacklisting(gray).isEmpty()) {
+ if (gray) {
+ decrGraylistedTrackers(getNumTaskTrackersOnHost(hostName));
+ } else {
+ addHostCapacity(hostName);
+ }
+ LOG.info("Un" + (gray? "gray" : "black") + "listing tracker : " +
+ hostName);
+ fi.unBlacklist(gray);
+ // We have unblack/graylisted tracker, so tracker should definitely
+ // be healthy. Check fault count; if zero, don't keep it in memory.
+ if (fi.getFaultCount(timeStamp) == 0) {
+ potentiallyFaultyTrackers.remove(hostName);
+ }
}
}
}
}
-
+
// Assumes JobTracker is locked on the entry
- private FaultInfo getFaultInfo(String hostName,
- boolean createIfNeccessary) {
+ private FaultInfo getFaultInfo(String hostName, boolean createIfNecessary) {
FaultInfo fi = null;
synchronized (potentiallyFaultyTrackers) {
fi = potentiallyFaultyTrackers.get(hostName);
- if (fi == null && createIfNeccessary) {
- fi = new FaultInfo(clock.getTime());
+ if (fi == null && createIfNecessary) {
+ fi = new FaultInfo(clock.getTime(), NUM_FAULT_BUCKETS,
+ TRACKER_FAULT_BUCKET_WIDTH_MSECS);
potentiallyFaultyTrackers.put(hostName, fi);
}
}
return fi;
}
-
- /**
- * Blacklists the tracker across all jobs if
- * <ol>
- * <li>#faults are more than
- * MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
- * <li>#faults is 50% (configurable) above the average #faults</li>
- * <li>50% the cluster is not blacklisted yet </li>
- * </ol>
- */
- private boolean exceedsFaults(FaultInfo fi) {
- int faultCount = fi.getFaultCount();
- if (faultCount >= MAX_BLACKLISTS_PER_TRACKER) {
- // calculate avgBlackLists
- long clusterSize = getClusterStatus().getTaskTrackers();
- long sum = 0;
- for (FaultInfo f : potentiallyFaultyTrackers.values()) {
- sum += f.getFaultCount();
- }
- double avg = (double) sum / clusterSize;
-
- long totalCluster = clusterSize + numBlacklistedTrackers;
- if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
- numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
- return true;
- }
- }
- return false;
- }
-
+
/**
- * Removes the tracker from blacklist and
- * from potentially faulty list, when it is restarted.
+ * Removes the tracker from the blacklist, graylist, and
+ * potentially-faulty list, when it is restarted.
*
* Assumes JobTracker is locked on the entry.
*
@@ -897,42 +1027,15 @@ public class JobTracker implements MRCon
void markTrackerHealthy(String hostName) {
synchronized (potentiallyFaultyTrackers) {
FaultInfo fi = potentiallyFaultyTrackers.remove(hostName);
- if (fi != null && fi.isBlacklisted()) {
- LOG.info("Removing " + hostName + " from blacklist");
- addHostCapacity(hostName);
- }
- }
- }
-
- /**
- * Check whether tasks can be assigned to the tracker.
- *
- * One fault of the tracker is discarded if there
- * are no faults during one day. So, the tracker will get a
- * chance again to run tasks of a job.
- * Assumes JobTracker is locked on the entry.
- *
- * @param hostName The tracker name
- * @param now The current time
- *
- * @return true if the tracker is blacklisted
- * false otherwise
- */
- boolean shouldAssignTasksToTracker(String hostName, long now) {
- synchronized (potentiallyFaultyTrackers) {
- FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
- if (fi != null &&
- (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
- int numFaults = fi.getFaultCount() - 1;
- fi.setFaultCount(numFaults);
- fi.setLastUpdated(now);
- if (canUnBlackListTracker(hostName,
- ReasonForBlackListing.EXCEEDING_FAILURES)) {
- unBlackListTracker(hostName,
- ReasonForBlackListing.EXCEEDING_FAILURES);
+ if (fi != null) {
+ if (fi.isGraylisted()) {
+ LOG.info("Removing " + hostName + " from graylist");
+ decrGraylistedTrackers(getNumTaskTrackersOnHost(hostName));
+ } else if (fi.isBlacklisted()) {
+ LOG.info("Removing " + hostName + " from blacklist");
+ addHostCapacity(hostName);
}
}
- return (fi != null && fi.isBlacklisted());
}
}
@@ -946,13 +1049,11 @@ public class JobTracker implements MRCon
int reduceSlots = status.getMaxReduceSlots();
totalReduceTaskCapacity -= reduceSlots;
++numTrackersOnHost;
- getInstrumentation().addBlackListedMapSlots(
- mapSlots);
- getInstrumentation().addBlackListedReduceSlots(
- reduceSlots);
+ getInstrumentation().addBlackListedMapSlots(mapSlots);
+ getInstrumentation().addBlackListedReduceSlots(reduceSlots);
}
uniqueHostsMap.remove(hostName);
- incrBlackListedTrackers(numTrackersOnHost);
+ incrBlacklistedTrackers(numTrackersOnHost);
}
}
@@ -970,18 +1071,18 @@ public class JobTracker implements MRCon
getInstrumentation().decBlackListedMapSlots(mapSlots);
getInstrumentation().decBlackListedReduceSlots(reduceSlots);
}
- uniqueHostsMap.put(hostName,
- numTrackersOnHost);
- decrBlackListedTrackers(numTrackersOnHost);
+ uniqueHostsMap.put(hostName, numTrackersOnHost);
+ decrBlacklistedTrackers(numTrackersOnHost);
}
}
/**
- * Whether a host is blacklisted across all the jobs.
+ * Whether a host is blacklisted (by health-check script) across all jobs.
*
* Assumes JobTracker is locked on the entry.
- * @param hostName
- * @return
+ *
+ * @param hostName - hostname to check
+ * @return true if blacklisted
*/
boolean isBlacklisted(String hostName) {
synchronized (potentiallyFaultyTrackers) {
@@ -992,56 +1093,61 @@ public class JobTracker implements MRCon
}
return false;
}
-
- // Assumes JobTracker is locked on the entry.
- int getFaultCount(String hostName) {
+
+ /**
+ * Whether a host is graylisted (by heuristics) "across all jobs".
+ *
+ * Assumes JobTracker is locked on the entry.
+ *
+ * @param hostName - hostname to check
+ * @return true if graylisted
+ */
+ boolean isGraylisted(String hostName) {
synchronized (potentiallyFaultyTrackers) {
FaultInfo fi = null;
if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
- return fi.getFaultCount();
+ return fi.isGraylisted();
}
}
- return 0;
+ return false;
}
-
+
// Assumes JobTracker is locked on the entry.
- Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
+ int getFaultCount(String hostName) {
synchronized (potentiallyFaultyTrackers) {
FaultInfo fi = null;
if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
- return fi.getReasonforblacklisting();
+ return fi.getFaultCount(clock.getTime());
}
}
- return null;
+ return 0;
}
// Assumes JobTracker is locked on the entry.
- void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
+ void setNodeHealthStatus(String hostName, boolean isHealthy, String reason,
+ long timeStamp) {
FaultInfo fi = null;
- // If tracker is not healthy, create a fault info object
- // blacklist it.
+ // If TaskTracker node is not healthy, get or create a fault info object
+ // and blacklist it. (This path to blacklisting ultimately comes from
+ // the health-check script called in NodeHealthCheckerService; see JIRA
+ // MAPREDUCE-211 for details. We never use graylisting for this path.)
if (!isHealthy) {
fi = getFaultInfo(hostName, true);
- fi.setHealthy(isHealthy);
+ fi.setHealth(isHealthy);
synchronized (potentiallyFaultyTrackers) {
- blackListTracker(hostName, reason,
- ReasonForBlackListing.NODE_UNHEALTHY);
+ blacklistTracker(hostName, reason,
+ ReasonForBlackListing.NODE_UNHEALTHY, false);
}
} else {
- fi = getFaultInfo(hostName, false);
- if (fi == null) {
- return;
- } else {
- if (canUnBlackListTracker(hostName,
- ReasonForBlackListing.NODE_UNHEALTHY)) {
- unBlackListTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY);
- }
+ if ((fi = getFaultInfo(hostName, false)) != null) {
+ unBlacklistTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY,
+ false, timeStamp);
}
}
}
}
-
+
/**
* Get all task tracker statuses on given host
*
@@ -1061,7 +1167,27 @@ public class JobTracker implements MRCon
}
return statuses;
}
-
+
+ /**
+ * Get total number of task trackers on given host
+ *
+ * Assumes JobTracker is locked on the entry
+ * @param hostName
+ * @return number of task trackers running on given host
+ */
+ private int getNumTaskTrackersOnHost(String hostName) {
+ int numTrackers = 0;
+ synchronized (taskTrackers) {
+ for (TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus status = tt.getStatus();
+ if (hostName.equals(status.getHost())) {
+ ++numTrackers;
+ }
+ }
+ }
+ return numTrackers;
+ }
+
///////////////////////////////////////////////////////
// Used to recover the jobs upon restart
///////////////////////////////////////////////////////
@@ -1915,7 +2041,8 @@ public class JobTracker implements MRCon
// Number of resolved entries
int numResolved;
-
+
+ // statistics about TaskTrackers with faults; may lead to blacklisting
private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
private JobTrackerStatistics statistics =
@@ -2054,8 +2181,30 @@ public class JobTracker implements MRCon
retiredJobsCacheSize =
conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
- MAX_BLACKLISTS_PER_TRACKER =
- conf.getInt("mapred.max.tracker.blacklists", 4);
+
+ // values related to heuristic blacklisting ("fault" == a per-job
+ // blacklisting; too many faults => node blacklisted across all jobs):
+ TRACKER_FAULT_TIMEOUT_WINDOW = // 3 hours
+ conf.getInt("mapred.jobtracker.blacklist.fault-timeout-window", 3 * 60);
+ TRACKER_FAULT_BUCKET_WIDTH = // 15 minutes
+ conf.getInt("mapred.jobtracker.blacklist.fault-bucket-width", 15);
+ TRACKER_FAULT_THRESHOLD =
+ conf.getInt("mapred.max.tracker.blacklists", 4);
+ // future: rename to "mapred.jobtracker.blacklist.fault-threshold" for
+ // namespace consistency
+
+ if (TRACKER_FAULT_BUCKET_WIDTH > TRACKER_FAULT_TIMEOUT_WINDOW) {
+ TRACKER_FAULT_BUCKET_WIDTH = TRACKER_FAULT_TIMEOUT_WINDOW;
+ }
+ TRACKER_FAULT_BUCKET_WIDTH_MSECS =
+ (long)TRACKER_FAULT_BUCKET_WIDTH * 60 * 1000;
+
+ // ideally, TRACKER_FAULT_TIMEOUT_WINDOW should be an integral multiple of
+ // TRACKER_FAULT_BUCKET_WIDTH, but round up just in case:
+ NUM_FAULT_BUCKETS =
+ (TRACKER_FAULT_TIMEOUT_WINDOW + TRACKER_FAULT_BUCKET_WIDTH - 1) /
+ TRACKER_FAULT_BUCKET_WIDTH;
+
NUM_HEARTBEATS_IN_SECOND =
conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
@@ -2069,11 +2218,11 @@ public class JobTracker implements MRCon
HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
}
- //This configuration is there solely for tuning purposes and
- //once this feature has been tested in real clusters and an appropriate
- //value for the threshold has been found, this config might be taken out.
- AVERAGE_BLACKLIST_THRESHOLD =
- conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
+ // This configuration is there solely for tuning purposes and
+ // once this feature has been tested in real clusters and an appropriate
+ // value for the threshold has been found, this config might be taken out.
+ AVERAGE_BLACKLIST_THRESHOLD =
+ conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
// This is a directory of temporary submission files. We delete it
// on startup, and can delete any files that we're done with
@@ -2695,7 +2844,8 @@ public class JobTracker implements MRCon
// mark the job for cleanup at all the trackers
addJobForCleanup(id);
- // add the blacklisted trackers to potentially faulty list
+ // add the (single-job) blacklisted trackers to potentially faulty list
+ // for possible heuristic blacklisting across all jobs
if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
if (job.getNoOfBlackListedTrackers() > 0) {
for (String hostName : job.getBlackListedTrackers()) {
@@ -2705,7 +2855,7 @@ public class JobTracker implements MRCon
}
String jobUser = job.getProfile().getUser();
- //add to the user to jobs mapping
+ // add to the user to jobs mapping
synchronized (userToJobsMap) {
ArrayList<JobInProgress> userJobs = userToJobsMap.get(jobUser);
if (userJobs == null) {
@@ -2829,64 +2979,94 @@ public class JobTracker implements MRCon
}
/**
- * Get the active and blacklisted task tracker names in the cluster. The first
- * element in the returned list contains the list of active tracker names.
- * The second element in the returned list contains the list of blacklisted
- * tracker names.
+ * Get the active, blacklisted, and graylisted task tracker names in the
+ * cluster. The first element in the returned list contains the list of
+ * active tracker names; the second element in the returned list contains
+ * the list of blacklisted tracker names; and the third contains the list
+ * of graylisted tracker names. Note that the blacklist is disjoint from
+ * the active list, but the graylist is not: graylisted trackers are still
+ * active and therefore may appear in both lists. (Graylisted trackers
+ * could conceivably appear on the blacklist rather than the active list;
+ * blacklisting comes about via the health-check script, while graylisting
+ * is heuristically based on the number of per-job blacklistings in a
+ * specified time interval.)
*/
- // This method is synchronized to make sure that the locking order
- // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
+ // This method is synchronized to make sure that the locking order
+ // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
// lock" is under JobTracker lock to avoid deadlocks.
synchronized public List<List<String>> taskTrackerNames() {
- List<String> activeTrackers =
- new ArrayList<String>();
- List<String> blacklistedTrackers =
- new ArrayList<String>();
+ List<String> activeTrackers = new ArrayList<String>();
+ List<String> blacklistedTrackers = new ArrayList<String>();
+ List<String> graylistedTrackers = new ArrayList<String>();
synchronized (taskTrackers) {
for (TaskTracker tt : taskTrackers.values()) {
TaskTrackerStatus status = tt.getStatus();
- if (!faultyTrackers.isBlacklisted(status.getHost())) {
- activeTrackers.add(status.getTrackerName());
+ String hostName = status.getHost();
+ String trackerName = status.getTrackerName();
+ if (!faultyTrackers.isBlacklisted(hostName)) {
+ activeTrackers.add(trackerName);
} else {
- blacklistedTrackers.add(status.getTrackerName());
+ blacklistedTrackers.add(trackerName);
+ }
+ if (faultyTrackers.isGraylisted(hostName)) {
+ graylistedTrackers.add(trackerName);
}
}
}
- List<List<String>> result = new ArrayList<List<String>>(2);
+ List<List<String>> result = new ArrayList<List<String>>(3);
result.add(activeTrackers);
result.add(blacklistedTrackers);
+ result.add(graylistedTrackers);
return result;
}
-
+
/**
- * Get the blacklisted task tracker statuses in the cluster
- *
- * @return {@link Collection} of blacklisted {@link TaskTrackerStatus}
+ * Get the statuses of the blacklisted task trackers in the cluster.
+ *
+ * @return {@link Collection} of blacklisted {@link TaskTrackerStatus}
*/
- // This method is synchronized to make sure that the locking order
- // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
+ // used by the web UI (machines.jsp)
+ public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
+ return blackOrGraylistedTaskTrackers(false);
+ }
+
+ /**
+ * Get the statuses of the graylisted task trackers in the cluster.
+ *
+ * @return {@link Collection} of graylisted {@link TaskTrackerStatus}
+ */
+ public Collection<TaskTrackerStatus> graylistedTaskTrackers() {
+ return blackOrGraylistedTaskTrackers(true);
+ }
+
+ // This method is synchronized to make sure that the locking order
+ // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers
// lock" is under JobTracker lock to avoid deadlocks.
- synchronized public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
- Collection<TaskTrackerStatus> blacklistedTrackers =
+ synchronized private Collection<TaskTrackerStatus>
+ blackOrGraylistedTaskTrackers(boolean gray) {
+ Collection<TaskTrackerStatus> listedTrackers =
new ArrayList<TaskTrackerStatus>();
synchronized (taskTrackers) {
for (TaskTracker tt : taskTrackers.values()) {
- TaskTrackerStatus status = tt.getStatus();
- if (faultyTrackers.isBlacklisted(status.getHost())) {
- blacklistedTrackers.add(status);
+ TaskTrackerStatus status = tt.getStatus();
+ boolean listed = gray? faultyTrackers.isGraylisted(status.getHost()) :
+ faultyTrackers.isBlacklisted(status.getHost());
+ if (listed) {
+ listedTrackers.add(status);
}
}
- }
- return blacklistedTrackers;
+ }
+ return listedTrackers;
}
synchronized int getFaultCount(String hostName) {
return faultyTrackers.getFaultCount(hostName);
}
-
+
/**
- * Get the number of blacklisted trackers across all the jobs
- *
+ * Get the number of task trackers that are blacklisted (via health-check
+ * script) across all jobs.
+ *
* @return
*/
int getBlacklistedTrackerCount() {
@@ -2894,10 +3074,20 @@ public class JobTracker implements MRCon
}
/**
+ * Get the number of task trackers that are graylisted (via heuristics on
+ * single-job blacklistings) across all jobs.
+ *
+ * @return
+ */
+ int getGraylistedTrackerCount() {
+ return faultyTrackers.numGraylistedTrackers;
+ }
+
+ /**
* Whether the tracker is blacklisted or not
- *
+ *
* @param trackerID
- *
+ *
* @return true if blacklisted, false otherwise
*/
synchronized public boolean isBlacklisted(String trackerID) {
@@ -2907,7 +3097,22 @@ public class JobTracker implements MRCon
}
return false;
}
-
+
+ /**
+ * Whether the tracker is graylisted or not
+ *
+ * @param trackerID
+ *
+ * @return true if graylisted, false otherwise
+ */
+ synchronized public boolean isGraylisted(String trackerID) {
+ TaskTrackerStatus status = getTaskTrackerStatus(trackerID);
+ if (status != null) {
+ return faultyTrackers.isGraylisted(status.getHost());
+ }
+ return false;
+ }
+
// lock to taskTrackers should hold JT lock first.
synchronized public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
TaskTracker taskTracker;
@@ -3086,12 +3291,10 @@ public class JobTracker implements MRCon
// First check if the last heartbeat response got through
String trackerName = status.getTrackerName();
long now = clock.getTime();
- boolean isBlacklisted = false;
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
- isBlacklisted =
- faultyTrackers.shouldAssignTasksToTracker(status.getHost(), now);
+ faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
}
HeartbeatResponse prevHeartbeatResponse =
@@ -3138,7 +3341,7 @@ public class JobTracker implements MRCon
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
status.setLastSeen(now);
- if (!processHeartbeat(status, initialContact)) {
+ if (!processHeartbeat(status, initialContact, now)) {
if (prevHeartbeatResponse != null) {
trackerToHeartbeatResponseMap.remove(trackerName);
}
@@ -3149,10 +3352,10 @@ public class JobTracker implements MRCon
// Initialize the response to be sent for the heartbeat
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
- isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
+ boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
- TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
+ TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
@@ -3389,11 +3592,12 @@ public class JobTracker implements MRCon
}
}
- private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
+ private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus,
+ long timeStamp) {
TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
synchronized (faultyTrackers) {
faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(),
- status.isNodeHealthy(), status.getHealthReport());
+ status.isNodeHealthy(), status.getHealthReport(), timeStamp);
}
}
@@ -3402,8 +3606,9 @@ public class JobTracker implements MRCon
*/
private synchronized boolean processHeartbeat(
TaskTrackerStatus trackerStatus,
- boolean initialContact) {
-
+ boolean initialContact,
+ long timeStamp) {
+
getInstrumentation().heartbeat();
String trackerName = trackerStatus.getTrackerName();
@@ -3429,10 +3634,10 @@ public class JobTracker implements MRCon
}
if (initialContact) {
- // if this is lost tracker that came back now, and if it blacklisted
+ // if this is lost tracker that came back now, and if it's blacklisted
// increment the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.incrBlackListedTrackers(1);
+ faultyTrackers.incrBlacklistedTrackers(1);
}
addNewTracker(taskTracker);
}
@@ -3440,7 +3645,7 @@ public class JobTracker implements MRCon
}
updateTaskStatuses(trackerStatus);
- updateNodeHealthStatus(trackerStatus);
+ updateNodeHealthStatus(trackerStatus, timeStamp);
return true;
}
@@ -3833,6 +4038,7 @@ public class JobTracker implements MRCon
List<List<String>> trackerNames = taskTrackerNames();
return new ClusterStatus(trackerNames.get(0),
trackerNames.get(1),
+ trackerNames.get(2),
TASKTRACKER_EXPIRY_INTERVAL,
totalMaps,
totalReduces,
@@ -3841,9 +4047,11 @@ public class JobTracker implements MRCon
state, getExcludedNodes().size()
);
} else {
- return new ClusterStatus(taskTrackers.size() -
- getBlacklistedTrackerCount(),
+ return new ClusterStatus(
+ // active trackers include graylisted but not blacklisted ones:
+ taskTrackers.size() - getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(),
+ getGraylistedTrackerCount(),
TASKTRACKER_EXPIRY_INTERVAL,
totalMaps,
totalReduces,
@@ -3861,7 +4069,8 @@ public class JobTracker implements MRCon
totalMapTaskCapacity, totalReduceTaskCapacity,
totalSubmissions,
taskTrackers.size() - getBlacklistedTrackerCount(),
- getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
+ getBlacklistedTrackerCount(), getGraylistedTrackerCount(),
+ getExcludedNodes().size()) ;
}
/**
@@ -4646,16 +4855,19 @@ public class JobTracker implements MRCon
String trackerName = tracker.getTrackerName();
// Remove completely after marking the tasks as 'KILLED'
lostTaskTracker(tracker);
- // tracker is lost, and if it is blacklisted, remove
+ // tracker is lost, and if it is blacklisted, remove
// it from the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.decrBlackListedTrackers(1);
+ faultyTrackers.decrBlacklistedTrackers(1);
+ }
+ if (isGraylisted(trackerName)) {
+ faultyTrackers.decrGraylistedTrackers(1);
}
updateTaskTrackerStatus(trackerName, null);
statistics.taskTrackerRemoved(trackerName);
getInstrumentation().decTrackers(1);
}
-
+
// main decommission
synchronized void decommissionNodes(Set<String> hosts)
throws IOException {
@@ -4930,13 +5142,23 @@ public class JobTracker implements MRCon
}
}
-
- synchronized String getReasonsForBlacklisting(String host) {
- FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+
+ // used by the web UI (machines.jsp)
+ public String getReasonsForBlacklisting(String host) {
+ return getReasonsForBlackOrGraylisting(host, false);
+ }
+
+ public String getReasonsForGraylisting(String host) {
+ return getReasonsForBlackOrGraylisting(host, true);
+ }
+
+ synchronized private String getReasonsForBlackOrGraylisting(String host,
+ boolean gray) {
+ FaultInfo fi = faultyTrackers.getFaultInfo(host, gray);
if (fi == null) {
return "";
}
- return fi.getTrackerFaultReport();
+ return fi.getTrackerBlackOrGraylistReport(gray);
}
/** Test Methods */
@@ -4945,18 +5167,9 @@ public class JobTracker implements MRCon
if (fi == null) {
return new HashSet<ReasonForBlackListing>();
}
- return fi.getReasonforblacklisting();
+ return fi.getReasonForBlacklisting(false);
}
- /*
- * This method is synchronized to make sure that the locking order
- * "faultyTrackers.potentiallyFaultyTrackers lock followed by taskTrackers
- * lock" is under JobTracker lock to avoid deadlocks.
- */
- synchronized void incrementFaults(String hostName) {
- faultyTrackers.incrementFaults(hostName);
- }
-
/**
*
* @return true if delegation token operation is allowed
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Mar 4 04:33:31 2011
@@ -157,6 +157,12 @@ class JobTrackerInstrumentation {
public void decBlackListedTrackers(int trackers)
{ }
+ public void addGrayListedTrackers(int trackers)
+ { }
+
+ public void decGrayListedTrackers(int trackers)
+ { }
+
public void setDecommissionedTrackers(int trackers)
{ }
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Fri Mar 4 04:33:31 2011
@@ -62,6 +62,7 @@ class JobTrackerMetricsInst extends JobT
private int numTrackers = 0;
private int numTrackersBlackListed = 0;
+ private int numTrackersGrayListed = 0;
private int numTrackersDecommissioned = 0;
// long, because 2^31 could well be only about a month's worth of
@@ -403,6 +404,18 @@ class JobTrackerMetricsInst extends JobT
}
@Override
+ public synchronized void addGrayListedTrackers(int trackers)
+ {
+ numTrackersGrayListed += trackers;
+ }
+
+ @Override
+ public synchronized void decGrayListedTrackers(int trackers)
+ {
+ numTrackersGrayListed -= trackers;
+ }
+
+ @Override
public synchronized void setDecommissionedTrackers(int trackers)
{
numTrackersDecommissioned = trackers;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 04:33:31 2011
@@ -474,7 +474,7 @@ class LocalJobRunner implements JobSubmi
}
public ClusterStatus getClusterStatus(boolean detailed) {
- return new ClusterStatus(1, 0, 0, map_tasks, reduce_tasks, 1, 1,
+ return new ClusterStatus(1, 0, 0, 0, map_tasks, reduce_tasks, 1, 1,
JobTracker.State.RUNNING);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Mar 4 04:33:31 2011
@@ -61,6 +61,7 @@ public class ClusterMetrics implements W
private int totalJobSubmissions;
private int numTrackers;
private int numBlacklistedTrackers;
+ private int numGraylistedTrackers;
private int numDecommissionedTrackers;
public ClusterMetrics() {
@@ -71,7 +72,7 @@ public class ClusterMetrics implements W
int reservedMapSlots, int reservedReduceSlots,
int mapSlots, int reduceSlots,
int totalJobSubmissions,
- int numTrackers, int numBlacklistedTrackers,
+ int numTrackers, int numBlacklistedTrackers, int numGraylistedTrackers,
int numDecommissionedNodes) {
this.runningMaps = runningMaps;
this.runningReduces = runningReduces;
@@ -84,6 +85,7 @@ public class ClusterMetrics implements W
this.totalJobSubmissions = totalJobSubmissions;
this.numTrackers = numTrackers;
this.numBlacklistedTrackers = numBlacklistedTrackers;
+ this.numGraylistedTrackers = numGraylistedTrackers;
this.numDecommissionedTrackers = numDecommissionedNodes;
}
@@ -187,6 +189,15 @@ public class ClusterMetrics implements W
}
/**
+ * Get the number of graylisted trackers in the cluster.
+ *
+ * @return graylisted tracker count
+ */
+ public int getGrayListedTaskTrackerCount() {
+ return numGraylistedTrackers;
+ }
+
+ /**
* Get the number of decommissioned trackers in the cluster.
*
* @return decommissioned tracker count
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java Fri Mar 4 04:33:31 2011
@@ -58,8 +58,8 @@ public class FakeObjectUtilities {
}
@Override
public ClusterStatus getClusterStatus(boolean detailed) {
- return new ClusterStatus(trackers.length,
- 0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+ return new ClusterStatus(trackers.length, 0, 0,
+ 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
}
public void setNumSlots(int totalSlots) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar 4 04:33:31 2011
@@ -144,7 +144,7 @@ public class TestJobQueueTaskScheduler e
@Override
public ClusterStatus getClusterStatus() {
int numTrackers = trackers.size();
- return new ClusterStatus(numTrackers, 0,
+ return new ClusterStatus(numTrackers, 0, 0,
JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
maps, reduces,
numTrackers * maxMapTasksPerTracker,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeBlacklisting.java Fri Mar 4 04:33:31 2011
@@ -99,16 +99,17 @@ public class TestNodeBlacklisting extend
job.waitForCompletion();
// validate the total tracker count
- assertEquals("Active tracker count mismatch",
- 1, jt.getClusterStatus(false).getTaskTrackers());
- // validate blacklisted count
- assertEquals("Blacklisted tracker count mismatch",
- 1, jt.getClusterStatus(false).getBlacklistedTrackers());
+ // (graylisted trackers remain active, unlike blacklisted ones)
+ assertEquals("Active tracker count mismatch",
+ 2, jt.getClusterStatus(false).getTaskTrackers());
+ // validate graylisted count
+ assertEquals("Graylisted tracker count mismatch",
+ 1, jt.getClusterStatus(false).getGraylistedTrackers());
- // find the blacklisted tracker
+ // find the graylisted tracker
String trackerName = null;
for (TaskTrackerStatus status : jt.taskTrackers()) {
- if (jt.isBlacklisted(status.getTrackerName())) {
+ if (jt.isGraylisted(status.getTrackerName())) {
trackerName = status.getTrackerName();
break;
}
@@ -127,9 +128,9 @@ public class TestNodeBlacklisting extend
// check the cluster status and tracker size
assertEquals("Tracker is not lost upon host decommissioning",
1, jt.getClusterStatus(false).getTaskTrackers());
- assertEquals("Blacklisted tracker count incorrect in cluster status "
+ assertEquals("Graylisted tracker count incorrect in cluster status "
+ "after decommissioning",
- 0, jt.getClusterStatus(false).getBlacklistedTrackers());
+ 0, jt.getClusterStatus(false).getGraylistedTrackers());
assertEquals("Tracker is not lost upon host decommissioning",
1, jt.taskTrackers().size());
} finally {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Fri Mar 4 04:33:31 2011
@@ -97,7 +97,7 @@ public class TestParallelInitialization
public ClusterStatus getClusterStatus() {
int numTrackers = trackers.size();
- return new ClusterStatus(numTrackers, 0,
+ return new ClusterStatus(numTrackers, 0, 0,
JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
maps, reduces,
numTrackers * maxMapTasksPerTracker,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp Fri Mar 4 04:33:31 2011
@@ -41,6 +41,7 @@
"<th>Map Task Capacity</th>" +
"<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" +
"<th>Blacklisted Nodes</th>" +
+ "<th>Graylisted Nodes</th>" +
"<th>Excluded Nodes</th></tr>\n");
out.print("<tr><td>" + metrics.getRunningMaps() + "</td><td>" +
metrics.getRunningReduces() + "</td><td>" +
@@ -56,6 +57,8 @@
"</td><td>" + tasksPerNode +
"</td><td><a href=\"machines.jsp?type=blacklisted\">" +
metrics.getBlackListedTaskTrackerCount() + "</a>" +
+ "</td><td><a href=\"machines.jsp?type=graylisted\">" +
+ metrics.getGrayListedTaskTrackerCount() + "</a>" +
"</td><td><a href=\"machines.jsp?type=excluded\">" +
metrics.getDecommissionedTaskTrackerCount() + "</a>" +
"</td></tr></table>\n");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp?rev=1077596&r1=1077595&r2=1077596&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp Fri Mar 4 04:33:31 2011
@@ -24,6 +24,9 @@
if (("blacklisted").equals(type)) {
out.println("<h2>Blacklisted Task Trackers</h2>");
c = tracker.blacklistedTaskTrackers();
+ } else if (("graylisted").equals(type)) {
+ out.println("<h2>Graylisted Task Trackers</h2>");
+ c = tracker.graylistedTaskTrackers();
} else if (("active").equals(type)) {
out.println("<h2>Active Task Trackers</h2>");
c = tracker.activeTaskTrackers();
@@ -33,7 +36,7 @@
}
int noCols = 9 +
(2 * tracker.getStatistics().collector.DEFAULT_COLLECT_WINDOWS.length);
- if(type.equals("blacklisted")) {
+ if (type.equals("blacklisted") || type.equals("graylisted")) {
noCols = noCols + 1;
}
if (c.size() == 0) {
@@ -49,10 +52,12 @@
"<td><b>Failures</b></td>" +
"<td><b>Node Health Status</b></td>" +
"<td><b>Seconds Since Node Last Healthy</b></td>");
- if(type.equals("blacklisted")) {
- out.print("<td><b>Reason For blacklisting</b></td>");
+ if (type.equals("blacklisted")) {
+ out.print("<td><b>Reason for Blacklisting</b></td>");
+ } else if (type.equals("graylisted")) {
+ out.print("<td><b>Reason for Graylisting</b></td>");
}
- for(StatisticsCollector.TimeWindow window : tracker.getStatistics().
+ for (StatisticsCollector.TimeWindow window : tracker.getStatistics().
collector.DEFAULT_COLLECT_WINDOWS) {
out.println("<td><b>Total Tasks "+window.name+"</b></td>");
out.println("<td><b>Succeeded Tasks "+window.name+"</b></td>");
@@ -97,10 +102,12 @@
"</td><td>" + numFailures +
"</td><td>" + healthString +
"</td><td>" + sinceHealthCheck);
- if(type.equals("blacklisted")) {
+ if (type.equals("blacklisted")) {
out.print("</td><td>" + tracker.getReasonsForBlacklisting(tt.getHost()));
+ } else if (type.equals("graylisted")) {
+ out.print("</td><td>" + tracker.getReasonsForGraylisting(tt.getHost()));
}
- for(StatisticsCollector.TimeWindow window : tracker.getStatistics().
+ for (StatisticsCollector.TimeWindow window : tracker.getStatistics().
collector.DEFAULT_COLLECT_WINDOWS) {
JobTrackerStatistics.TaskTrackerStat ttStat = tracker.getStatistics().
getTaskTrackerStat(tt.getTrackerName());