You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:27:17 UTC
svn commit: r1076974 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ webapps/job/
Author: omalley
Date: Fri Mar 4 03:27:17 2011
New Revision: 1076974
URL: http://svn.apache.org/viewvc?rev=1076974&view=rev
Log:
commit d4d7d3bc97e6d10dba3db7664594e77e962382e3
Author: Yahoo\! <lt...@yahoo-inc.com>
Date: Tue Aug 18 09:07:19 2009 -0700
Applyint Patch for MAPREDUCE:467 from: http://issues.apache.org/jira/secure/attachment/12413284/467_branch_0.20.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerStatistics.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076974&r1=1076973&r2=1076974&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:27:17 2011
@@ -962,8 +962,16 @@ class JobInProgress {
if (taskEvent != null) {
this.taskCompletionEvents.add(taskEvent);
taskCompletionEventTracker++;
+ JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker.
+ getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid));
+ if(ttStat != null) { // ttStat can be null in case of lost tracker
+ ttStat.incrTotalTasks();
+ }
if (state == TaskStatus.State.SUCCEEDED) {
completedTask(tip, status);
+ if(ttStat != null) {
+ ttStat.incrSucceededTasks();
+ }
}
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076974&r1=1076973&r2=1076974&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:27:17 2011
@@ -391,7 +391,7 @@ public class JobTracker implements MRCon
faultyTrackers.numBlacklistedTrackers -= 1;
}
updateTaskTrackerStatus(trackerName, null);
-
+ statistics.taskTrackerRemoved(trackerName);
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -1795,6 +1795,8 @@ public class JobTracker implements MRCon
private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
+ private JobTrackerStatistics statistics =
+ new JobTrackerStatistics();
//
// Watch and expire TaskTracker objects using these structures.
// We can map from Name->TaskTrackerStatus, or we can expire by time.
@@ -2680,6 +2682,9 @@ public class JobTracker implements MRCon
}
}
+ JobTrackerStatistics getStatistics() {
+ return statistics;
+ }
/**
* Adds a new node to the jobtracker. It involves adding it to the expiry
* thread and adding it for resolution
@@ -2705,6 +2710,7 @@ public class JobTracker implements MRCon
trackers = Collections.synchronizedSet(new HashSet<TaskTracker>());
hostnameToTaskTracker.put(hostname, trackers);
}
+ statistics.taskTrackerAdded(status.getTrackerName());
LOG.info("Adding tracker " + status.getTrackerName() + " to host "
+ hostname);
trackers.add(taskTracker);
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerStatistics.java?rev=1076974&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerStatistics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerStatistics.java Fri Mar 4 03:27:17 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.StatisticsCollector.Stat;
+
+/**
+ * Collects the job tracker statistics.
+ *
+ */
+class JobTrackerStatistics {
+
+ final StatisticsCollector collector;
+ final Map<String, TaskTrackerStat> ttStats =
+ new HashMap<String, TaskTrackerStat>();
+
+ JobTrackerStatistics() {
+ collector = new StatisticsCollector();
+ collector.start();
+ }
+
+ synchronized void taskTrackerAdded(String name) {
+ TaskTrackerStat stat = ttStats.get(name);
+ if(stat == null) {
+ stat = new TaskTrackerStat(name);
+ ttStats.put(name, stat);
+ }
+ }
+
+ synchronized void taskTrackerRemoved(String name) {
+ TaskTrackerStat stat = ttStats.remove(name);
+ if(stat != null) {
+ stat.remove();
+ }
+ }
+
+ synchronized TaskTrackerStat getTaskTrackerStat(String name) {
+ return ttStats.get(name);
+ }
+
+ class TaskTrackerStat {
+ final String totalTasksKey;
+ final Stat totalTasksStat;
+
+ final String succeededTasksKey;
+ final Stat succeededTasksStat;
+
+ TaskTrackerStat(String trackerName) {
+ totalTasksKey = trackerName+"-"+"totalTasks";
+ totalTasksStat = collector.createStat(totalTasksKey);
+ succeededTasksKey = trackerName+"-"+"succeededTasks";
+ succeededTasksStat = collector.createStat(succeededTasksKey);
+ }
+
+ synchronized void incrTotalTasks() {
+ totalTasksStat.inc();
+ }
+
+ synchronized void incrSucceededTasks() {
+ succeededTasksStat.inc();
+ }
+
+ synchronized void remove() {
+ collector.removeStat(totalTasksKey);
+ collector.removeStat(succeededTasksKey);
+ }
+
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java?rev=1076974&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java Fri Mar 4 03:27:17 2011
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.mapred.StatisticsCollector.Stat.TimeStat;
+
+/**
+ * Collects the statistics in time windows.
+ */
+class StatisticsCollector {
+
+ private static final int DEFAULT_PERIOD = 5;
+
+ static final TimeWindow
+ SINCE_START = new TimeWindow("Since Start", -1, -1);
+
+ static final TimeWindow
+ LAST_WEEK = new TimeWindow("Last Week", 7 * 24 * 60 * 60, 60 * 60);
+
+ static final TimeWindow
+ LAST_DAY = new TimeWindow("Last Day", 24 * 60 * 60, 60 * 60);
+
+ static final TimeWindow
+ LAST_HOUR = new TimeWindow("Last Hour", 60 * 60, 60);
+
+ static final TimeWindow
+ LAST_MINUTE = new TimeWindow("Last Minute", 60, 10);
+
+ static final TimeWindow[] DEFAULT_COLLECT_WINDOWS = {
+ StatisticsCollector.SINCE_START,
+ StatisticsCollector.LAST_DAY,
+ StatisticsCollector.LAST_HOUR
+ };
+
+ private final int period;
+ private boolean started;
+
+ private final Map<TimeWindow, StatUpdater> updaters =
+ new LinkedHashMap<TimeWindow, StatUpdater>();
+ private final Map<String, Stat> statistics = new HashMap<String, Stat>();
+
+ StatisticsCollector() {
+ this(DEFAULT_PERIOD);
+ }
+
+ StatisticsCollector(int period) {
+ this.period = period;
+ }
+
+ synchronized void start() {
+ if (started) {
+ return;
+ }
+ Timer timer = new Timer("Timer thread for monitoring ", true);
+ TimerTask task = new TimerTask() {
+ public void run() {
+ update();
+ }
+ };
+ long millis = period * 1000;
+ timer.scheduleAtFixedRate(task, millis, millis);
+ started = true;
+ }
+
+ protected synchronized void update() {
+ for (StatUpdater c : updaters.values()) {
+ c.update();
+ }
+ }
+
+ Map<TimeWindow, StatUpdater> getUpdaters() {
+ return Collections.unmodifiableMap(updaters);
+ }
+
+ Map<String, Stat> getStatistics() {
+ return Collections.unmodifiableMap(statistics);
+ }
+
+ synchronized Stat createStat(String name) {
+ return createStat(name, DEFAULT_COLLECT_WINDOWS);
+ }
+
+ synchronized Stat createStat(String name, TimeWindow[] windows) {
+ if (statistics.get(name) != null) {
+ throw new RuntimeException("Stat with name "+ name +
+ " is already defined");
+ }
+ Map<TimeWindow, TimeStat> timeStats =
+ new LinkedHashMap<TimeWindow, TimeStat>();
+ for (TimeWindow window : windows) {
+ StatUpdater collector = updaters.get(window);
+ if (collector == null) {
+ if(SINCE_START.equals(window)) {
+ collector = new StatUpdater();
+ } else {
+ collector = new TimeWindowStatUpdater(window, period);
+ }
+ updaters.put(window, collector);
+ }
+ TimeStat timeStat = new TimeStat();
+ collector.addTimeStat(name, timeStat);
+ timeStats.put(window, timeStat);
+ }
+
+ Stat stat = new Stat(name, timeStats);
+ statistics.put(name, stat);
+ return stat;
+ }
+
+ synchronized Stat removeStat(String name) {
+ Stat stat = statistics.remove(name);
+ if (stat != null) {
+ for (StatUpdater collector : updaters.values()) {
+ collector.removeTimeStat(name);
+ }
+ }
+ return stat;
+ }
+
+ static class TimeWindow {
+ final String name;
+ final int windowSize;
+ final int updateGranularity;
+ TimeWindow(String name, int windowSize, int updateGranularity) {
+ if (updateGranularity > windowSize) {
+ throw new RuntimeException(
+ "Invalid TimeWindow: updateGranularity > windowSize");
+ }
+ this.name = name;
+ this.windowSize = windowSize;
+ this.updateGranularity = updateGranularity;
+ }
+
+ public int hashCode() {
+ return name.hashCode() + updateGranularity + windowSize;
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ final TimeWindow other = (TimeWindow) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (updateGranularity != other.updateGranularity)
+ return false;
+ if (windowSize != other.windowSize)
+ return false;
+ return true;
+ }
+ }
+
+ static class Stat {
+ final String name;
+ private Map<TimeWindow, TimeStat> timeStats;
+
+ private Stat(String name, Map<TimeWindow, TimeStat> timeStats) {
+ this.name = name;
+ this.timeStats = timeStats;
+ }
+
+ public synchronized void inc(int incr) {
+ for (TimeStat ts : timeStats.values()) {
+ ts.inc(incr);
+ }
+ }
+
+ public synchronized void inc() {
+ inc(1);
+ }
+
+ public synchronized Map<TimeWindow, TimeStat> getValues() {
+ return Collections.unmodifiableMap(timeStats);
+ }
+
+ static class TimeStat {
+ private final LinkedList<Integer> buckets = new LinkedList<Integer>();
+ private int value;
+ private int currentValue;
+
+ public synchronized int getValue() {
+ return value;
+ }
+
+ private synchronized void inc(int i) {
+ currentValue += i;
+ }
+
+ private synchronized void addBucket() {
+ buckets.addLast(currentValue);
+ setValueToCurrent();
+ }
+
+ private synchronized void setValueToCurrent() {
+ value += currentValue;
+ currentValue = 0;
+ }
+
+ private synchronized void removeBucket() {
+ int removed = buckets.removeFirst();
+ value -= removed;
+ }
+ }
+ }
+
+ private static class StatUpdater {
+
+ protected final Map<String, TimeStat> statToCollect =
+ new HashMap<String, TimeStat>();
+
+ synchronized void addTimeStat(String name, TimeStat s) {
+ statToCollect.put(name, s);
+ }
+
+ synchronized TimeStat removeTimeStat(String name) {
+ return statToCollect.remove(name);
+ }
+
+ synchronized void update() {
+ for (TimeStat stat : statToCollect.values()) {
+ stat.setValueToCurrent();
+ }
+ }
+ }
+
+ /**
+ * Updates TimeWindow statistics in buckets.
+ *
+ */
+ private static class TimeWindowStatUpdater extends StatUpdater{
+
+ final int collectBuckets;
+ final int updatesPerBucket;
+
+ private int updates;
+ private int buckets;
+
+ TimeWindowStatUpdater(TimeWindow w, int updatePeriod) {
+ if (updatePeriod > w.updateGranularity) {
+ throw new RuntimeException(
+ "Invalid conf: updatePeriod > updateGranularity");
+ }
+ collectBuckets = w.windowSize / w.updateGranularity;
+ updatesPerBucket = w.updateGranularity / updatePeriod;
+ }
+
+ synchronized void update() {
+ updates++;
+ if (updates == updatesPerBucket) {
+ for(TimeStat stat : statToCollect.values()) {
+ stat.addBucket();
+ }
+ updates = 0;
+ buckets++;
+ if (buckets > collectBuckets) {
+ for (TimeStat stat : statToCollect.values()) {
+ stat.removeBucket();
+ }
+ buckets--;
+ }
+ }
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java?rev=1076974&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java Fri Mar 4 03:27:17 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
+import org.apache.hadoop.mapred.StatisticsCollector.Stat;
+
+public class TestStatisticsCollector extends TestCase{
+
+ public void testMovingWindow() throws Exception {
+ StatisticsCollector collector = new StatisticsCollector(1);
+ TimeWindow window = new TimeWindow("test", 6, 2);
+ TimeWindow sincStart = StatisticsCollector.SINCE_START;
+ TimeWindow[] windows = {sincStart, window};
+
+ Stat stat = collector.createStat("m1", windows);
+
+ stat.inc(3);
+ collector.update();
+ assertEquals(0, stat.getValues().get(window).getValue());
+ assertEquals(3, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(3);
+ collector.update();
+ assertEquals((3+3), stat.getValues().get(window).getValue());
+ assertEquals(6, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(10);
+ collector.update();
+ assertEquals((3+3), stat.getValues().get(window).getValue());
+ assertEquals(16, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(10);
+ collector.update();
+ assertEquals((3+3+10+10), stat.getValues().get(window).getValue());
+ assertEquals(26, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(10);
+ collector.update();
+ stat.inc(10);
+ collector.update();
+ assertEquals((3+3+10+10+10+10), stat.getValues().get(window).getValue());
+ assertEquals(46, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(10);
+ collector.update();
+ assertEquals((3+3+10+10+10+10), stat.getValues().get(window).getValue());
+ assertEquals(56, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(12);
+ collector.update();
+ assertEquals((10+10+10+10+10+12), stat.getValues().get(window).getValue());
+ assertEquals(68, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(13);
+ collector.update();
+ assertEquals((10+10+10+10+10+12), stat.getValues().get(window).getValue());
+ assertEquals(81, stat.getValues().get(sincStart).getValue());
+
+ stat.inc(14);
+ collector.update();
+ assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
+ assertEquals(95, stat.getValues().get(sincStart).getValue());
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp?rev=1076974&r1=1076973&r2=1076974&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp Fri Mar 4 03:27:17 2011
@@ -29,9 +29,10 @@
out.println("<h2>Task Trackers</h2>");
c = tracker.taskTrackers();
}
- int noCols = 9;
+ int noCols = 9 +
+ (2 * tracker.getStatistics().collector.DEFAULT_COLLECT_WINDOWS.length);
if(type.equals("blacklisted")) {
- noCols = 10;
+ noCols = noCols + 1;
}
if (c.size() == 0) {
out.print("There are currently no known " + type + " Task Trackers.");
@@ -49,6 +50,12 @@
if(type.equals("blacklisted")) {
out.print("<td><b>Reason For blacklisting</b></td>");
}
+ for(StatisticsCollector.TimeWindow window : tracker.getStatistics().
+ collector.DEFAULT_COLLECT_WINDOWS) {
+ out.println("<td><b>Total Tasks "+window.name+"</b></td>");
+ out.println("<td><b>Succeeded Tasks "+window.name+"</b></td>");
+ }
+
out.print("<td><b>Seconds since heartbeat</b></td></tr>\n");
int maxFailures = 0;
@@ -91,6 +98,16 @@
if(type.equals("blacklisted")) {
out.print("</td><td>" + tracker.getReasonsForBlacklisting(tt.getHost()));
}
+ for(StatisticsCollector.TimeWindow window : tracker.getStatistics().
+ collector.DEFAULT_COLLECT_WINDOWS) {
+ JobTrackerStatistics.TaskTrackerStat ttStat = tracker.getStatistics().
+ getTaskTrackerStat(tt.getTrackerName());
+ out.println("</td><td>" + ttStat.totalTasksStat.getValues().
+ get(window).getValue());
+ out.println("</td><td>" + ttStat.succeededTasksStat.getValues().
+ get(window).getValue());
+ }
+
out.print("</td><td>" + sinceHeartbeat + "</td></tr>\n");
}
out.print("</table>\n");