You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC
svn commit: r830230 [6/9] - in /hadoop/mapreduce/branches/HDFS-641: ./
.eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/
src/contrib/capacity-scheduler/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-sche...
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 27 15:43:58 2009
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
@@ -131,18 +130,27 @@
// The maximum number of blacklists for a tracker after which the
// tracker could be blacklisted across all jobs
private int MAX_BLACKLISTS_PER_TRACKER = 4;
+
// Approximate number of heartbeats that could arrive JobTracker
// in a second
- private int NUM_HEARTBEATS_IN_SECOND = 100;
+ private int NUM_HEARTBEATS_IN_SECOND;
+ private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+ private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+
+ // Scaling factor for heartbeats, used for testing only
+ private float HEARTBEATS_SCALING_FACTOR;
+ private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+ private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
private DNSToSwitchMapping dnsToSwitchMapping;
- private NetworkTopology clusterMap = new NetworkTopology();
+ NetworkTopology clusterMap = new NetworkTopology();
private int numTaskCacheLevels; // the max level to which we cache tasks
private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
- private final TaskScheduler taskScheduler;
+ final TaskScheduler taskScheduler;
private final List<JobInProgressListener> jobInProgressListeners =
new CopyOnWriteArrayList<JobInProgressListener>();
@@ -158,7 +166,7 @@
static final Clock DEFAULT_CLOCK = new Clock();
- private JobHistory jobHistory = null;
+ private final JobHistory jobHistory;
/**
* A client tried to submit a job before the Job Tracker was ready.
@@ -416,10 +424,11 @@
// tracker is lost, and if it is blacklisted, remove
// it from the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers -= 1;
+ faultyTrackers.decrBlackListedTrackers(1);
}
updateTaskTrackerStatus(trackerName, null);
statistics.taskTrackerRemoved(trackerName);
+ getInstrumentation().decTrackers(1);
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -641,7 +650,16 @@
}
}
-
+ private void incrBlackListedTrackers(int count) {
+ numBlacklistedTrackers += count;
+ getInstrumentation().addBlackListedTrackers(count);
+ }
+
+ private void decrBlackListedTrackers(int count) {
+ numBlacklistedTrackers -= count;
+ getInstrumentation().decBlackListedTrackers(count);
+ }
+
private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
FaultInfo fi = getFaultInfo(hostName, true);
boolean blackListed = fi.isBlacklisted();
@@ -800,7 +818,7 @@
getInstrumentation().addBlackListedReduceSlots(
reduceSlots);
}
- numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+ incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
}
}
@@ -820,7 +838,7 @@
}
uniqueHostsMap.put(hostName,
numTrackersOnHost);
- numBlacklistedTrackers -= numTrackersOnHost;
+ decrBlackListedTrackers(numTrackersOnHost);
}
}
@@ -1126,12 +1144,12 @@
////////////////////////////////////////////////////////////////
int port;
String localMachine;
- private String trackerIdentifier;
+ private final String trackerIdentifier;
long startTime;
int totalSubmissions = 0;
private int totalMapTaskCapacity;
private int totalReduceTaskCapacity;
- private HostsFileReader hostsReader;
+ private final HostsFileReader hostsReader;
// JobTracker recovery variables
private volatile boolean hasRecovered = false;
@@ -1206,6 +1224,10 @@
//
int totalMaps = 0;
int totalReduces = 0;
+ private int occupiedMapSlots = 0;
+ private int occupiedReduceSlots = 0;
+ private int reservedMapSlots = 0;
+ private int reservedReduceSlots = 0;
private HashMap<String, TaskTracker> taskTrackers =
new HashMap<String, TaskTracker>();
Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -1217,9 +1239,9 @@
Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
"expireLaunchingTasks");
- CompletedJobStatusStore completedJobStatusStore = null;
+ final CompletedJobStatusStore completedJobStatusStore;
Thread completedJobsStoreThread = null;
- RecoveryManager recoveryManager;
+ final RecoveryManager recoveryManager;
/**
* It might seem like a bug to maintain a TreeSet of tasktracker objects,
@@ -1256,7 +1278,7 @@
static final String SUBDIR = "jobTracker";
FileSystem fs = null;
Path systemDir = null;
- private JobConf conf;
+ JobConf conf;
private final UserGroupInformation mrOwner;
private final String supergroup;
@@ -1265,7 +1287,7 @@
long memSizeForMapSlotOnJT;
long memSizeForReduceSlotOnJT;
- private QueueManager queueManager;
+ private final QueueManager queueManager;
JobTracker(JobConf conf)
throws IOException,InterruptedException, LoginException {
@@ -1294,8 +1316,21 @@
tasktrackerExpiryInterval =
conf.getLong(JT_TRACKER_EXPIRY_INTERVAL, 10 * 60 * 1000);
retiredJobsCacheSize = conf.getInt(JT_RETIREJOB_CACHE_SIZE, 1000);
- MAX_BLACKLISTS_PER_TRACKER = conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
- NUM_HEARTBEATS_IN_SECOND = conf.getInt(JT_HEARTBEATS_IN_SECOND, 100);
+ MAX_BLACKLISTS_PER_TRACKER =
+ conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
+
+ NUM_HEARTBEATS_IN_SECOND =
+ conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+ if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+ NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+ }
+
+ HEARTBEATS_SCALING_FACTOR =
+ conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR,
+ DEFAULT_HEARTBEATS_SCALING_FACTOR);
+ if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+ HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+ }
//This configuration is there solely for tuning purposes and
//once this feature has been tested in real clusters and an appropriate
@@ -1790,7 +1825,7 @@
*
* @param taskTracker tasktracker whose 'non-running' tasks are to be purged
*/
- private void removeMarkedTasks(String taskTracker) {
+ void removeMarkedTasks(String taskTracker) {
// Purge all the 'marked' tasks which were running at taskTracker
Set<TaskAttemptID> markedTaskSet =
trackerToMarkedTasksMap.get(taskTracker);
@@ -2086,7 +2121,7 @@
*
* @param status Task Tracker's status
*/
- private void addNewTracker(TaskTracker taskTracker) {
+ void addNewTracker(TaskTracker taskTracker) {
TaskTrackerStatus status = taskTracker.getStatus();
trackerExpiryQueue.add(status);
@@ -2104,6 +2139,7 @@
hostnameToTaskTracker.put(hostname, trackers);
}
statistics.taskTrackerAdded(status.getTrackerName());
+ getInstrumentation().addTrackers(1);
LOG.info("Adding tracker " + status.getTrackerName() + " to host "
+ hostname);
trackers.add(taskTracker);
@@ -2183,7 +2219,7 @@
// Update the listeners about the job
// Assuming JobTracker is locked on entry.
- private void updateJobInProgressListeners(JobChangeEvent event) {
+ void updateJobInProgressListeners(JobChangeEvent event) {
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobUpdated(event);
}
@@ -2347,15 +2383,16 @@
/**
* Calculates next heartbeat interval using cluster size.
- * Heartbeat interval is incremented 1second for every 50 nodes.
+ * Heartbeat interval is incremented by 1 second for every 100 nodes by default.
* @return next heartbeat interval.
*/
public int getNextHeartbeatInterval() {
// get the no of task trackers
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatInterval = Math.max(
- (int)(1000 * Math.ceil((double)clusterSize /
- NUM_HEARTBEATS_IN_SECOND)),
+ (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+ Math.ceil((double)clusterSize /
+ NUM_HEARTBEATS_IN_SECOND)),
HEARTBEAT_INTERVAL_MIN) ;
return heartbeatInterval;
}
@@ -2393,13 +2430,17 @@
* @param status The new status for the task tracker
* @return Was an old status found?
*/
- private boolean updateTaskTrackerStatus(String trackerName,
+ boolean updateTaskTrackerStatus(String trackerName,
TaskTrackerStatus status) {
TaskTracker tt = getTaskTracker(trackerName);
TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+ occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+ getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+ getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
int mapSlots = oldStatus.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
@@ -2422,6 +2463,10 @@
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ occupiedMapSlots += status.countOccupiedMapSlots();
+ occupiedReduceSlots += status.countOccupiedReduceSlots();
+ getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+ getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(status.getHost())) {
int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
@@ -2489,6 +2534,25 @@
return oldStatus != null;
}
+ // Increment the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void incrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots += reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots += reservedSlots;
+ }
+ }
+
+ // Decrement the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void decrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots -= reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots -= reservedSlots;
+ }
+ }
private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
@@ -2501,7 +2565,7 @@
/**
* Process incoming heartbeat messages from the task trackers.
*/
- private synchronized boolean processHeartbeat(
+ synchronized boolean processHeartbeat(
TaskTrackerStatus trackerStatus,
boolean initialContact) {
String trackerName = trackerStatus.getTrackerName();
@@ -2530,7 +2594,7 @@
// if this is lost tracker that came back now, and if it blacklisted
// increment the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers += 1;
+ faultyTrackers.incrBlackListedTrackers(1);
}
addNewTracker(taskTracker);
}
@@ -2547,8 +2611,7 @@
* A tracker wants to know if any of its Tasks have been
* closed (because the job completed, whether successfully or not)
*/
- private synchronized List<TaskTrackerAction> getTasksToKill(
- String taskTracker) {
+ synchronized List<TaskTrackerAction> getTasksToKill(String taskTracker) {
Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
@@ -2625,7 +2688,7 @@
/**
* A tracker wants to know if any of its Tasks can be committed
*/
- private synchronized List<TaskTrackerAction> getTasksToSave(
+ synchronized List<TaskTrackerAction> getTasksToSave(
TaskTrackerStatus tts) {
List<TaskStatus> taskStatuses = tts.getTaskReports();
if (taskStatuses != null) {
@@ -2928,9 +2991,12 @@
}
public synchronized ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
- totalReduceTaskCapacity, taskTrackers.size() -
- getBlacklistedTrackerCount(),
+ return new ClusterMetrics(totalMaps,
+ totalReduces, occupiedMapSlots, occupiedReduceSlots,
+ reservedMapSlots, reservedReduceSlots,
+ totalMapTaskCapacity, totalReduceTaskCapacity,
+ totalSubmissions,
+ taskTrackers.size() - getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
}
@@ -3744,12 +3810,13 @@
}
// main decommission
- private synchronized void decommissionNodes(Set<String> hosts)
+ synchronized void decommissionNodes(Set<String> hosts)
throws IOException {
LOG.info("Decommissioning " + hosts.size() + " nodes");
// create a list of tracker hostnames
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
+ int trackersDecommissioned = 0;
for (String host : hosts) {
LOG.info("Decommissioning host " + host);
Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -3758,11 +3825,14 @@
LOG.info("Decommission: Losing tracker " + tracker +
" on host " + host);
lostTaskTracker(tracker); // lose the tracker
- updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+ updateTaskTrackerStatus(
+ tracker.getStatus().getTrackerName(), null);
}
+ trackersDecommissioned += trackers.size();
}
LOG.info("Host " + host + " is ready for decommissioning");
}
+ getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
}
}
}
@@ -4090,7 +4160,108 @@
void incrementFaults(String hostName) {
faultyTrackers.incrementFaults(hostName);
}
-
+
+ JobTracker(JobConf conf, Clock clock, boolean ignoredForSimulation)
+ throws IOException {
+ this.clock = clock;
+ this.conf = conf;
+ trackerIdentifier = getDateFormat().format(new Date());
+
+ if (fs == null) {
+ fs = FileSystem.get(conf);
+ }
+
+ tasktrackerExpiryInterval =
+ conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
+ retiredJobsCacheSize =
+ conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+
+ // min time before retire
+ MAX_BLACKLISTS_PER_TRACKER =
+ conf.getInt("mapred.max.tracker.blacklists", 4);
+ NUM_HEARTBEATS_IN_SECOND =
+ conf.getInt("mapred.heartbeats.in.second", 100);
+
+ try {
+ mrOwner = UnixUserGroupInformation.login(conf);
+ } catch (LoginException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+
+ this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+ conf.get("mapred.hosts.exclude", ""));
+ // queue manager
+ Configuration queuesConf = new Configuration(this.conf);
+ queueManager = new QueueManager(queuesConf);
+
+ // Create the scheduler
+ Class<? extends TaskScheduler> schedulerClass
+ = conf.getClass("mapred.jobtracker.taskScheduler",
+ JobQueueTaskScheduler.class, TaskScheduler.class);
+ taskScheduler =
+ (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);
+
+ // Set ports, start RPC servers, setup security policy etc.
+ InetSocketAddress addr = getAddress(conf);
+ this.localMachine = addr.getHostName();
+ this.port = addr.getPort();
+
+ // Create the jetty server
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
+ conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+ String infoBindAddress = infoSocAddr.getHostName();
+ int tmpInfoPort = infoSocAddr.getPort();
+ this.startTime = clock.getTime();
+ infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
+ tmpInfoPort == 0, conf);
+ infoServer.setAttribute("job.tracker", this);
+
+ // initialize history parameters.
+ String historyLogDir = null;
+ FileSystem historyFS = null;
+
+ jobHistory = new JobHistory();
+ jobHistory.init(this, conf, this.localMachine, this.startTime);
+ jobHistory.initDone(conf, fs);
+ historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
+ infoServer.setAttribute("historyLogDir", historyLogDir);
+ historyFS = new Path(historyLogDir).getFileSystem(conf);
+
+ infoServer.setAttribute("fileSys", historyFS);
+ infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
+ infoServer.start();
+ this.infoPort = this.infoServer.getPort();
+
+ // Initialize instrumentation
+ JobTrackerInstrumentation tmp;
+ Class<? extends JobTrackerInstrumentation> metricsInst =
+ getInstrumentationClass(conf);
+ try {
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+ tmp = c.newInstance(this, conf);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize job tracker metrics", e);
+ tmp = new JobTrackerMetricsInst(this, conf);
+ }
+ myInstrumentation = tmp;
+
+ // start the recovery manager
+ recoveryManager = new RecoveryManager();
+
+ this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+ conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+ DNSToSwitchMapping.class), conf);
+ this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
+ NetworkTopology.DEFAULT_HOST_LEVEL);
+
+ //initializes the job status store
+ completedJobStatusStore = new CompletedJobStatusStore(conf);
+ }
+
/**
* Get the path of the locally stored job file
* @param jobId id of the job
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Tue Oct 27 15:43:58 2009
@@ -84,4 +84,80 @@
public void decBlackListedReduceSlots(int slots)
{ }
+
+ public void addReservedMapSlots(int slots)
+ { }
+
+ public void decReservedMapSlots(int slots)
+ { }
+
+ public void addReservedReduceSlots(int slots)
+ { }
+
+ public void decReservedReduceSlots(int slots)
+ { }
+
+ public void addOccupiedMapSlots(int slots)
+ { }
+
+ public void decOccupiedMapSlots(int slots)
+ { }
+
+ public void addOccupiedReduceSlots(int slots)
+ { }
+
+ public void decOccupiedReduceSlots(int slots)
+ { }
+
+ public void failedJob(JobConf conf, JobID id)
+ { }
+
+ public void killedJob(JobConf conf, JobID id)
+ { }
+
+ public void addPrepJob(JobConf conf, JobID id)
+ { }
+
+ public void decPrepJob(JobConf conf, JobID id)
+ { }
+
+ public void addRunningJob(JobConf conf, JobID id)
+ { }
+
+ public void decRunningJob(JobConf conf, JobID id)
+ { }
+
+ public void addRunningMaps(JobID id, int task)
+ { }
+
+ public void decRunningMaps(JobID id, int task)
+ { }
+
+ public void addRunningReduces(JobID id, int task)
+ { }
+
+ public void decRunningReduces(JobID id, int task)
+ { }
+
+ public void killedMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void killedReduce(TaskAttemptID taskAttemptID)
+ { }
+
+ public void addTrackers(int trackers)
+ { }
+
+ public void decTrackers(int trackers)
+ { }
+
+ public void addBlackListedTrackers(int trackers)
+ { }
+
+ public void decBlackListedTrackers(int trackers)
+ { }
+
+ public void setDecommissionedTrackers(int trackers)
+ { }
+
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Tue Oct 27 15:43:58 2009
@@ -22,8 +22,6 @@
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
private final MetricsRecord metricsRecord;
@@ -45,6 +43,27 @@
private int numBlackListedMapSlots = 0;
private int numBlackListedReduceSlots = 0;
+ private int numReservedMapSlots = 0;
+ private int numReservedReduceSlots = 0;
+ private int numOccupiedMapSlots = 0;
+ private int numOccupiedReduceSlots = 0;
+
+ private int numJobsFailed = 0;
+ private int numJobsKilled = 0;
+
+ private int numJobsPreparing = 0;
+ private int numJobsRunning = 0;
+
+ private int numRunningMaps = 0;
+ private int numRunningReduces = 0;
+
+ private int numMapTasksKilled = 0;
+ private int numReduceTasksKilled = 0;
+
+ private int numTrackers = 0;
+ private int numTrackersBlackListed = 0;
+ private int numTrackersDecommissioned = 0;
+
public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
super(tracker, conf);
String sessionId = conf.getSessionId();
@@ -78,6 +97,28 @@
metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+
+ metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
+ metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
+ metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
+ metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
+
+ metricsRecord.incrMetric("jobs_failed", numJobsFailed);
+ metricsRecord.incrMetric("jobs_killed", numJobsKilled);
+
+ metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
+ metricsRecord.incrMetric("jobs_running", numJobsRunning);
+
+ metricsRecord.incrMetric("running_maps", numRunningMaps);
+ metricsRecord.incrMetric("running_reduces", numRunningReduces);
+
+ metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
+ metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
+
+ metricsRecord.incrMetric("trackers", numTrackers);
+ metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
+ metricsRecord.setMetric("trackers_decommissioned",
+ numTrackersDecommissioned);
numMapTasksLaunched = 0;
numMapTasksCompleted = 0;
@@ -91,6 +132,26 @@
numWaitingReduces = 0;
numBlackListedMapSlots = 0;
numBlackListedReduceSlots = 0;
+
+ numReservedMapSlots = 0;
+ numReservedReduceSlots = 0;
+ numOccupiedMapSlots = 0;
+ numOccupiedReduceSlots = 0;
+
+ numJobsFailed = 0;
+ numJobsKilled = 0;
+
+ numJobsPreparing = 0;
+ numJobsRunning = 0;
+
+ numRunningMaps = 0;
+ numRunningReduces = 0;
+
+ numMapTasksKilled = 0;
+ numReduceTasksKilled = 0;
+
+ numTrackers = 0;
+ numTrackersBlackListed = 0;
}
metricsRecord.update();
@@ -166,12 +227,12 @@
}
@Override
- public void setMapSlots(int slots) {
+ public synchronized void setMapSlots(int slots) {
numMapSlots = slots;
}
@Override
- public void setReduceSlots(int slots) {
+ public synchronized void setReduceSlots(int slots) {
numReduceSlots = slots;
}
@@ -194,4 +255,154 @@
public synchronized void decBlackListedReduceSlots(int slots){
numBlackListedReduceSlots -= slots;
}
+
+ @Override
+ public synchronized void addReservedMapSlots(int slots)
+ {
+ numReservedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedMapSlots(int slots)
+ {
+ numReservedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addReservedReduceSlots(int slots)
+ {
+ numReservedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedReduceSlots(int slots)
+ {
+ numReservedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedMapSlots(int slots)
+ {
+ numOccupiedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedMapSlots(int slots)
+ {
+ numOccupiedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedReduceSlots(int slots)
+ {
+ numOccupiedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedReduceSlots(int slots)
+ {
+ numOccupiedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void failedJob(JobConf conf, JobID id)
+ {
+ numJobsFailed++;
+ }
+
+ @Override
+ public synchronized void killedJob(JobConf conf, JobID id)
+ {
+ numJobsKilled++;
+ }
+
+ @Override
+ public synchronized void addPrepJob(JobConf conf, JobID id)
+ {
+ numJobsPreparing++;
+ }
+
+ @Override
+ public synchronized void decPrepJob(JobConf conf, JobID id)
+ {
+ numJobsPreparing--;
+ }
+
+ @Override
+ public synchronized void addRunningJob(JobConf conf, JobID id)
+ {
+ numJobsRunning++;
+ }
+
+ @Override
+ public synchronized void decRunningJob(JobConf conf, JobID id)
+ {
+ numJobsRunning--;
+ }
+
+ @Override
+ public synchronized void addRunningMaps(JobID id, int task)
+ {
+ numRunningMaps += task;
+ }
+
+ @Override
+ public synchronized void decRunningMaps(JobID id, int task)
+ {
+ numRunningMaps -= task;
+ }
+
+ @Override
+ public synchronized void addRunningReduces(JobID id, int task)
+ {
+ numRunningReduces += task;
+ }
+
+ @Override
+ public synchronized void decRunningReduces(JobID id, int task)
+ {
+ numRunningReduces -= task;
+ }
+
+ @Override
+ public synchronized void killedMap(TaskAttemptID taskAttemptID)
+ {
+ numMapTasksKilled++;
+ }
+
+ @Override
+ public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+ {
+ numReduceTasksKilled++;
+ }
+
+ @Override
+ public synchronized void addTrackers(int trackers)
+ {
+ numTrackers += trackers;
+ }
+
+ @Override
+ public synchronized void decTrackers(int trackers)
+ {
+ numTrackers -= trackers;
+ }
+
+ @Override
+ public synchronized void addBlackListedTrackers(int trackers)
+ {
+ numTrackersBlackListed += trackers;
+ }
+
+ @Override
+ public synchronized void decBlackListedTrackers(int trackers)
+ {
+ numTrackersBlackListed -= trackers;
+ }
+
+ @Override
+ public synchronized void setDecommissionedTrackers(int trackers)
+ {
+ numTrackersDecommissioned = trackers;
+ }
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Oct 27 15:43:58 2009
@@ -69,6 +69,7 @@
return ClientProtocol.versionID;
}
+ @SuppressWarnings("unchecked")
static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
throws Exception {
JobConf jobConf = jContext.getJobConf();
@@ -309,7 +310,7 @@
}
}
// delete the temporary directory in output directory
- outputCommitter.cleanupJob(jContext);
+ outputCommitter.commitJob(jContext);
status.setCleanupProgress(1.0f);
if (killed) {
@@ -322,7 +323,8 @@
} catch (Throwable t) {
try {
- outputCommitter.cleanupJob(jContext);
+ outputCommitter.abortJob(jContext,
+ org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
} catch (IOException ioe) {
LOG.info("Error cleaning up job:" + id);
}
@@ -505,7 +507,8 @@
}
public ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(map_tasks, reduce_tasks, 1, 1, 1, 0, 0);
+ return new ClusterMetrics(map_tasks, reduce_tasks, map_tasks, reduce_tasks,
+ 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
public State getJobTrackerState() throws IOException, InterruptedException {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -71,10 +71,38 @@
*
* @param jobContext Context of the job whose output is being written.
* @throws IOException
+ * @deprecated Use {@link #commitJob(JobContext)} or
+ * {@link #abortJob(JobContext, int)} instead.
*/
- public abstract void cleanupJob(JobContext jobContext) throws IOException;
+ @Deprecated
+ public void cleanupJob(JobContext jobContext) throws IOException { }
/**
+ * For committing job's output after successful job completion. Note that this
+ * is invoked for jobs with final runstate as SUCCESSFUL.
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @throws IOException
+ */
+ public void commitJob(JobContext jobContext) throws IOException {
+ cleanupJob(jobContext);
+ }
+
+ /**
+ * For aborting an unsuccessful job's output. Note that this is invoked for
+ * jobs with final runstate as {@link JobStatus#FAILED} or
+ * {@link JobStatus#KILLED}
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @param status final runstate of the job
+ * @throws IOException
+ */
+ public void abortJob(JobContext jobContext, int status)
+ throws IOException {
+ cleanupJob(jobContext);
+ }
+
+ /**
* Sets up output for the task.
*
* @param taskContext Context of the task whose output is being written.
@@ -128,8 +156,12 @@
* This method implements the new interface by calling the old method. Note
* that the input types are different between the new and old apis and this
* is a bridge between the two.
+ * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
+ * instead.
*/
@Override
+ @Deprecated
public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
) throws IOException {
cleanupJob((JobContext) context);
@@ -141,6 +173,33 @@
* is a bridge between the two.
*/
@Override
+ public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
+ ) throws IOException {
+ commitJob((JobContext) context);
+ }
+
+ /**
+ * This method implements the new interface by calling the old method. Note
+ * that the input types are different between the new and old apis and this
+ * is a bridge between the two.
+ */
+ @Override
+ public final void abortJob(org.apache.hadoop.mapreduce.JobContext context,
+ org.apache.hadoop.mapreduce.JobStatus.State runState)
+ throws IOException {
+ int state = JobStatus.getOldNewJobRunState(runState);
+ if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+ throw new IOException ("Invalid job run state : " + runState.name());
+ }
+ abortJob((JobContext) context, state);
+ }
+
+ /**
+ * This method implements the new interface by calling the old method. Note
+ * that the input types are different between the new and old apis and this
+ * is a bridge between the two.
+ */
+ @Override
public final
void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
) throws IOException {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java Tue Oct 27 15:43:58 2009
@@ -27,9 +27,14 @@
* This can be used to list paths of output directory as follows:
* Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
* new OutputLogFilter()));
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter}
+ * instead.
*/
public class OutputLogFilter implements PathFilter {
+ private static final PathFilter LOG_FILTER =
+ new Utils.OutputFileUtils.OutputLogFilter();
public boolean accept(Path path) {
- return !(path.toString().contains("_logs"));
+ return LOG_FILTER.accept(path);
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java Tue Oct 27 15:43:58 2009
@@ -42,9 +42,11 @@
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
@@ -129,6 +131,7 @@
private TaskAttemptID taskId; // unique, includes job id
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
+ protected JobStatus.State jobRunStateForCleanup;
protected boolean jobCleanup = false;
protected boolean jobSetup = false;
protected boolean taskCleanup = false;
@@ -322,6 +325,14 @@
return jobCleanup;
}
+ boolean isJobAbortTask() {
+ // the task is an abort task if its marked for cleanup and the final
+ // expected state is either failed or killed.
+ return isJobCleanupTask()
+ && (jobRunStateForCleanup == JobStatus.State.KILLED
+ || jobRunStateForCleanup == JobStatus.State.FAILED);
+ }
+
boolean isJobSetupTask() {
return jobSetup;
}
@@ -334,6 +345,14 @@
jobCleanup = true;
}
+ /**
+ * Sets the task to do job abort in the cleanup.
+ * @param status the final runstate of the job.
+ */
+ void setJobCleanupTaskState(JobStatus.State status) {
+ jobRunStateForCleanup = status;
+ }
+
boolean isMapOrReduce() {
return !jobSetup && !jobCleanup && !taskCleanup;
}
@@ -362,6 +381,9 @@
skipRanges.write(out);
out.writeBoolean(skipping);
out.writeBoolean(jobCleanup);
+ if (jobCleanup) {
+ WritableUtils.writeEnum(out, jobRunStateForCleanup);
+ }
out.writeBoolean(jobSetup);
out.writeBoolean(writeSkipRecs);
out.writeBoolean(taskCleanup);
@@ -379,6 +401,10 @@
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
jobCleanup = in.readBoolean();
+ if (jobCleanup) {
+ jobRunStateForCleanup =
+ WritableUtils.readEnum(in, JobStatus.State.class);
+ }
jobSetup = in.readBoolean();
writeSkipRecs = in.readBoolean();
taskCleanup = in.readBoolean();
@@ -872,7 +898,27 @@
getProgress().setStatus("cleanup");
statusUpdate(umbilical);
// do the cleanup
- committer.cleanupJob(jobContext);
+ LOG.info("Cleaning up job");
+ if (jobRunStateForCleanup == JobStatus.State.FAILED
+ || jobRunStateForCleanup == JobStatus.State.KILLED) {
+ LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name());
+ if (conf.getUseNewMapper()) {
+ committer.abortJob(jobContext, jobRunStateForCleanup);
+ } else {
+ org.apache.hadoop.mapred.OutputCommitter oldCommitter =
+ (org.apache.hadoop.mapred.OutputCommitter)committer;
+ oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
+ }
+ } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+ LOG.info("Committing job");
+ committer.commitJob(jobContext);
+ } else {
+ throw new IOException("Invalid state of the job for cleanup. State found "
+ + jobRunStateForCleanup + " expecting "
+ + JobStatus.State.SUCCEEDED + ", "
+ + JobStatus.State.FAILED + " or "
+ + JobStatus.State.KILLED);
+ }
done(umbilical, reporter);
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Oct 27 15:43:58 2009
@@ -113,7 +113,7 @@
/**
* Map from taskId -> TaskStatus
*/
- private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
+ TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
new TreeMap<TaskAttemptID,TaskStatus>();
// Map from taskId -> TaskTracker Id,
@@ -985,6 +985,8 @@
public Task addRunningTask(TaskAttemptID taskid,
String taskTracker,
boolean taskCleanup) {
+ // 1 slot is enough for taskCleanup task
+ int numSlotsNeeded = taskCleanup ? 1 : numSlotsRequired;
// create the task
Task t = null;
if (isMapTask()) {
@@ -999,9 +1001,9 @@
split = new BytesWritable();
}
t = new MapTask(jobFile, taskid, partition, splitClass, split,
- numSlotsRequired);
+ numSlotsNeeded);
} else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsRequired);
+ t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
}
if (jobCleanup) {
t.setJobCleanupTask();
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Oct 27 15:43:58 2009
@@ -59,13 +59,9 @@
private static final File LOG_DIR =
new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
+ // localFS is set in (and used by) writeToIndexFile()
static LocalFileSystem localFS = null;
static {
- try {
- localFS = FileSystem.getLocal(new Configuration());
- } catch (IOException ioe) {
- LOG.warn("Getting local file system failed.");
- }
if (!LOG_DIR.exists()) {
boolean b = LOG_DIR.mkdirs();
if (!b) {
@@ -200,6 +196,10 @@
File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
Path indexFilePath = new Path(indexFile.getAbsolutePath());
Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+ if (localFS == null) {// set localFS once
+ localFS = FileSystem.getLocal(new Configuration());
+ }
localFS.rename (tmpIndexFilePath, indexFilePath);
}
private static void resetPrevLengths(TaskAttemptID firstTaskid) {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Oct 27 15:43:58 2009
@@ -51,6 +51,10 @@
private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
private List<TaskAttemptID> tasksToBeRemoved;
+ private static final String MEMORY_USAGE_STRING =
+ "Memory usage of ProcessTree %s for task-id %s : %d bytes, " +
+ "limit : %d bytes";
+
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
taskTracker.getJobConf().getLong(
@@ -209,8 +213,8 @@
// are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
long limit = ptInfo.getMemLimit();
- LOG.info("Memory usage of ProcessTree " + pId + " :"
- + currentMemUsage + "bytes. Limit : " + limit + "bytes");
+ LOG.info(String.format(MEMORY_USAGE_STRING,
+ pId, tid.toString(), currentMemUsage, limit));
if (isProcessTreeOverLimit(tid.toString(), currentMemUsage,
curMemUsageOfAgedProcesses, limit)) {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 27 15:43:58 2009
@@ -143,6 +143,7 @@
* set via {@link JobConf#MAPRED_MAP_TASK_ENV} or
* {@link JobConf#MAPRED_REDUCE_TASK_ENV}
*/
+ @Deprecated
public String getChildEnv(JobConf jobConf) {
return jobConf.get(JobConf.MAPRED_TASK_ENV);
}
@@ -209,7 +210,8 @@
stderr);
Map<String, String> env = new HashMap<String, String>();
- errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
+ taskid, logSize);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -253,7 +255,12 @@
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
- tip.reportTaskFinished();
+
+ // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+ // *false* since the task has either
+ // a) SUCCEEDED - which means commit has been done
+ // b) FAILED - which means we do not need to commit
+ tip.reportTaskFinished(false);
}
}
@@ -478,6 +485,7 @@
}
/**
+ * sets the environment variables needed for task jvm and its children.
* @param errorInfo
* @param workDir
* @param env
@@ -485,7 +493,7 @@
* @throws Throwable
*/
private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
- Map<String, String> env)
+ Map<String, String> env, TaskAttemptID taskid, long logSize)
throws Throwable {
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
@@ -497,6 +505,18 @@
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ // for the child of task jvm, set hadoop.root.logger
+ env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+ String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+ if (hadoopClientOpts == null) {
+ hadoopClientOpts = "";
+ } else {
+ hadoopClientOpts = hadoopClientOpts + " ";
+ }
+ hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+ + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+ env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
// add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf);
if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Oct 27 15:43:58 2009
@@ -27,6 +27,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
/**************************************************
* Describes the current status of a task. This is
* not intended to be a comprehensive piece of data.
@@ -132,11 +133,21 @@
}
/**
- * Sets finishTime.
+ * Sets finishTime for the task status if and only if the
+ * start time is set and passed finish time is greater than
+ * zero.
+ *
* @param finishTime finish time of task.
*/
void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
+ if(this.getStartTime() > 0 && finishTime > 0) {
+ this.finishTime = finishTime;
+ } else {
+ //Using String utils to get the stack trace.
+ LOG.error("Trying to set finish time for task " + taskid +
+ " when no start time is set, stackTrace is : " +
+ StringUtils.stringifyException(new Exception()));
+ }
}
/**
* Get shuffle finish time for the task. If shuffle finish time was
@@ -201,11 +212,20 @@
}
/**
- * Set startTime of the task.
+ * Set startTime of the task if start time is greater than zero.
* @param startTime start time
*/
void setStartTime(long startTime) {
- this.startTime = startTime;
+ //Making the assumption of passed startTime to be a positive
+ //long value explicit.
+ if (startTime > 0) {
+ this.startTime = startTime;
+ } else {
+ //Using String utils to get the stack trace.
+ LOG.error("Trying to set illegal startTime for task : " + taskid +
+ ".Stack trace is : " +
+ StringUtils.stringifyException(new Exception()));
+ }
}
/**
* Get current phase of this task. Phase.Map in case of map tasks,
@@ -326,11 +346,11 @@
setDiagnosticInfo(status.getDiagnosticInfo());
- if (status.getStartTime() != 0) {
- this.startTime = status.getStartTime();
+ if (status.getStartTime() > 0) {
+ this.setStartTime(status.getStartTime());
}
- if (status.getFinishTime() != 0) {
- this.finishTime = status.getFinishTime();
+ if (status.getFinishTime() > 0) {
+ this.setFinishTime(status.getFinishTime());
}
this.phase = status.getPhase();
@@ -359,8 +379,8 @@
setProgress(progress);
setStateString(state);
setPhase(phase);
- if (finishTime != 0) {
- this.finishTime = finishTime;
+ if (finishTime > 0) {
+ setFinishTime(finishTime);
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Oct 27 15:43:58 2009
@@ -225,6 +225,14 @@
private int maxMapSlots;
private int maxReduceSlots;
private int failures;
+
+ // Performance-related config knob to send an out-of-band heartbeat
+ // on task completion
+ private volatile boolean oobHeartbeatOnTaskCompletion;
+
+ // Track number of completed tasks to send an out-of-band heartbeat
+ private IntWritable finishedCount = new IntWritable(0);
+
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
CleanupQueue directoryCleanupThread;
@@ -636,6 +644,9 @@
if (shouldStartHealthMonitor(this.fConf)) {
startHealthMonitor(this.fConf);
}
+
+ oobHeartbeatOnTaskCompletion =
+ fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
}
public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -1176,8 +1187,14 @@
long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
- // sleeps for the wait time
- Thread.sleep(waitTime);
+ // sleeps for the wait time or
+ // until there are empty slots to schedule tasks
+ synchronized (finishedCount) {
+ if (finishedCount.get() == 0) {
+ finishedCount.wait(waitTime);
+ }
+ finishedCount.set(0);
+ }
}
// If the TaskTracker is just starting up:
@@ -1867,6 +1884,19 @@
}
}
+ /**
+ * Notify the tasktracker to send an out-of-band heartbeat.
+ */
+ private void notifyTTAboutTaskCompletion() {
+ if (oobHeartbeatOnTaskCompletion) {
+ synchronized (finishedCount) {
+ int value = finishedCount.get();
+ finishedCount.set(value+1);
+ finishedCount.notify();
+ }
+ }
+ }
+
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -2185,9 +2215,21 @@
return wasKilled;
}
- void reportTaskFinished() {
- taskFinished();
- releaseSlot();
+ /**
+ * A task is reporting in as 'done'.
+ *
+ * We need to notify the tasktracker to send an out-of-band heartbeat.
+ * If isn't <code>commitPending</code>, we need to finalize the task
+ * and release the slot it's occupied.
+ *
+ * @param commitPending is the task-commit pending?
+ */
+ void reportTaskFinished(boolean commitPending) {
+ if (!commitPending) {
+ taskFinished();
+ releaseSlot();
+ }
+ notifyTTAboutTaskCompletion();
}
/* State changes:
@@ -2487,8 +2529,10 @@
taskStatus.setRunState(TaskStatus.State.KILLED);
}
}
+ taskStatus.setFinishTime(System.currentTimeMillis());
removeFromMemoryManager(task.getTaskID());
releaseSlot();
+ notifyTTAboutTaskCompletion();
}
private synchronized void releaseSlot() {
@@ -2817,9 +2861,7 @@
tip = tasks.get(taskid);
}
if (tip != null) {
- if (!commitPending) {
- tip.reportTaskFinished();
- }
+ tip.reportTaskFinished(commitPending);
} else {
LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Tue Oct 27 15:43:58 2009
@@ -382,7 +382,7 @@
JobConf conf,
Class<InterfaceType> cls
) throws ClassNotFoundException {
- return conf.getClassByName((String) cl.getOptionValue(key)).asSubclass(cls);
+ return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls);
}
@Override
@@ -420,15 +420,14 @@
JobConf job = new JobConf(getConf());
if (results.hasOption("input")) {
- FileInputFormat.setInputPaths(job,
- (String) results.getOptionValue("input"));
+ FileInputFormat.setInputPaths(job, results.getOptionValue("input"));
}
if (results.hasOption("output")) {
FileOutputFormat.setOutputPath(job,
- new Path((String) results.getOptionValue("output")));
+ new Path(results.getOptionValue("output")));
}
if (results.hasOption("jar")) {
- job.setJar((String) results.getOptionValue("jar"));
+ job.setJar(results.getOptionValue("jar"));
}
if (results.hasOption("inputformat")) {
setIsJavaRecordReader(job, true);
@@ -451,7 +450,7 @@
job.setReducerClass(getClass(results, "reduce", job, Reducer.class));
}
if (results.hasOption("reduces")) {
- job.setNumReduceTasks(Integer.parseInt((String)
+ job.setNumReduceTasks(Integer.parseInt(
results.getOptionValue("reduces")));
}
if (results.hasOption("writer")) {
@@ -461,18 +460,18 @@
}
if (results.hasOption("lazyOutput")) {
- if (Boolean.parseBoolean((String)results.getOptionValue("lazyOutput"))) {
+ if (Boolean.parseBoolean(results.getOptionValue("lazyOutput"))) {
LazyOutputFormat.setOutputFormatClass(job,
job.getOutputFormat().getClass());
}
}
if (results.hasOption("program")) {
- setExecutable(job, (String) results.getOptionValue("program"));
+ setExecutable(job, results.getOptionValue("program"));
}
if (results.hasOption("jobconf")) {
LOG.warn("-jobconf option is deprecated, please use -D instead.");
- String options = (String)results.getOptionValue("jobconf");
+ String options = results.getOptionValue("jobconf");
StringTokenizer tokenizer = new StringTokenizer(options, ",");
while (tokenizer.hasMoreTokens()) {
String keyVal = tokenizer.nextToken().trim();
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Tue Oct 27 15:43:58 2009
@@ -35,11 +35,17 @@
* Number of blacklisted and decommissioned trackers.
* </li>
* <li>
- * Task capacity of the cluster.
+ * Slot capacity of the cluster.
+ * </li>
+ * <li>
+ * The number of currently occupied/reserved map & reduce slots.
* </li>
* <li>
* The number of currently running map & reduce tasks.
* </li>
+ * <li>
+ * The number of job submissions.
+ * </li>
* </ol></p>
*
* <p>Clients can query for the latest <code>ClusterMetrics</code>, via
@@ -48,53 +54,147 @@
* @see Cluster
*/
public class ClusterMetrics implements Writable {
- int runningMaps;
- int runningReduces;
- int mapSlots;
- int reduceSlots;
- int numTrackers;
- int numBlacklistedTrackers;
- int numDecommissionedTrackers;
+ private int runningMaps;
+ private int runningReduces;
+ private int occupiedMapSlots;
+ private int occupiedReduceSlots;
+ private int reservedMapSlots;
+ private int reservedReduceSlots;
+ private int totalMapSlots;
+ private int totalReduceSlots;
+ private int totalJobSubmissions;
+ private int numTrackers;
+ private int numBlacklistedTrackers;
+ private int numDecommissionedTrackers;
public ClusterMetrics() {
}
- public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots,
- int reduceSlots, int numTrackers, int numBlacklistedTrackers,
- int numDecommisionedNodes) {
+ public ClusterMetrics(int runningMaps, int runningReduces,
+ int occupiedMapSlots, int occupiedReduceSlots,
+ int reservedMapSlots, int reservedReduceSlots,
+ int mapSlots, int reduceSlots,
+ int totalJobSubmissions,
+ int numTrackers, int numBlacklistedTrackers,
+ int numDecommissionedNodes) {
this.runningMaps = runningMaps;
this.runningReduces = runningReduces;
- this.mapSlots = mapSlots;
- this.reduceSlots = reduceSlots;
+ this.occupiedMapSlots = occupiedMapSlots;
+ this.occupiedReduceSlots = occupiedReduceSlots;
+ this.reservedMapSlots = reservedMapSlots;
+ this.reservedReduceSlots = reservedReduceSlots;
+ this.totalMapSlots = mapSlots;
+ this.totalReduceSlots = reduceSlots;
+ this.totalJobSubmissions = totalJobSubmissions;
this.numTrackers = numTrackers;
this.numBlacklistedTrackers = numBlacklistedTrackers;
- this.numDecommissionedTrackers = numDecommisionedNodes;
+ this.numDecommissionedTrackers = numDecommissionedNodes;
+ }
+
+ /**
+ * Get the number of running map tasks in the cluster.
+ *
+ * @return running maps
+ */
+ public int getRunningMaps() {
+ return runningMaps;
}
+ /**
+ * Get the number of running reduce tasks in the cluster.
+ *
+ * @return running reduces
+ */
+ public int getRunningReduces() {
+ return runningReduces;
+ }
+
+ /**
+ * Get number of occupied map slots in the cluster.
+ *
+ * @return occupied map slot count
+ */
public int getOccupiedMapSlots() {
- return runningMaps;
+ return occupiedMapSlots;
}
+ /**
+ * Get the number of occupied reduce slots in the cluster.
+ *
+ * @return occupied reduce slot count
+ */
public int getOccupiedReduceSlots() {
- return runningReduces;
+ return occupiedReduceSlots;
}
-
+
+ /**
+ * Get number of reserved map slots in the cluster.
+ *
+ * @return reserved map slot count
+ */
+ public int getReservedMapSlots() {
+ return reservedMapSlots;
+ }
+
+ /**
+ * Get the number of reserved reduce slots in the cluster.
+ *
+ * @return reserved reduce slot count
+ */
+ public int getReservedReduceSlots() {
+ return reservedReduceSlots;
+ }
+
+ /**
+ * Get the total number of map slots in the cluster.
+ *
+ * @return map slot capacity
+ */
public int getMapSlotCapacity() {
- return mapSlots;
+ return totalMapSlots;
}
+ /**
+ * Get the total number of reduce slots in the cluster.
+ *
+ * @return reduce slot capacity
+ */
public int getReduceSlotCapacity() {
- return reduceSlots;
+ return totalReduceSlots;
}
+ /**
+ * Get the total number of job submissions in the cluster.
+ *
+ * @return total number of job submissions
+ */
+ public int getTotalJobSubmissions() {
+ return totalJobSubmissions;
+ }
+
+ /**
+ * Get the number of active trackers in the cluster.
+ *
+ * @return active tracker count.
+ */
public int getTaskTrackerCount() {
return numTrackers;
}
+ /**
+ * Get the number of blacklisted trackers in the cluster.
+ *
+ * @return blacklisted tracker count
+ */
public int getBlackListedTaskTrackerCount() {
return numBlacklistedTrackers;
}
+ /**
+ * Get the number of decommissioned trackers in the cluster.
+ *
+ * @return decommissioned tracker count
+ */
public int getDecommissionedTaskTrackerCount() {
return numDecommissionedTrackers;
}
@@ -103,8 +203,13 @@
public void readFields(DataInput in) throws IOException {
runningMaps = in.readInt();
runningReduces = in.readInt();
- mapSlots = in.readInt();
- reduceSlots = in.readInt();
+ occupiedMapSlots = in.readInt();
+ occupiedReduceSlots = in.readInt();
+ reservedMapSlots = in.readInt();
+ reservedReduceSlots = in.readInt();
+ totalMapSlots = in.readInt();
+ totalReduceSlots = in.readInt();
+ totalJobSubmissions = in.readInt();
numTrackers = in.readInt();
numBlacklistedTrackers = in.readInt();
numDecommissionedTrackers = in.readInt();
@@ -114,8 +219,13 @@
public void write(DataOutput out) throws IOException {
out.writeInt(runningMaps);
out.writeInt(runningReduces);
- out.writeInt(mapSlots);
- out.writeInt(reduceSlots);
+ out.writeInt(occupiedMapSlots);
+ out.writeInt(occupiedReduceSlots);
+ out.writeInt(reservedMapSlots);
+ out.writeInt(reservedReduceSlots);
+ out.writeInt(totalMapSlots);
+ out.writeInt(totalReduceSlots);
+ out.writeInt(totalJobSubmissions);
out.writeInt(numTrackers);
out.writeInt(numBlacklistedTrackers);
out.writeInt(numDecommissionedTrackers);
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Oct 27 15:43:58 2009
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.net.URI;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
@@ -31,6 +33,8 @@
* A read-only view of the job that is provided to the tasks while they
* are running.
*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public interface JobContext {
// Put all of the attribute names in here so that Job and JobContext are
// consistent.
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java Tue Oct 27 15:43:58 2009
@@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
* The context that is given to the {@link Mapper}.
* @param <KEYIN> the key input type to the Mapper
@@ -25,6 +28,8 @@
* @param <KEYOUT> the key output type from the Mapper
* @param <VALUEOUT> the value output type from the Mapper
*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce;
import java.io.IOException;
-
/**
* <code>OutputCommitter</code> describes the commit of task output for a
* Map-Reduce job.
@@ -69,9 +68,38 @@
*
* @param jobContext Context of the job whose output is being written.
* @throws IOException
+ * @deprecated Use {@link #commitJob(JobContext)} or
+ * {@link #abortJob(JobContext, JobStatus.State)} instead.
+ */
+ @Deprecated
+ public void cleanupJob(JobContext jobContext) throws IOException { }
+
+ /**
+ * For committing job's output after successful job completion. Note that this
+ * is invoked for jobs with final runstate as SUCCESSFUL.
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @throws IOException
*/
- public abstract void cleanupJob(JobContext jobContext) throws IOException;
+ public void commitJob(JobContext jobContext) throws IOException {
+ cleanupJob(jobContext);
+ }
+
+ /**
+ * For aborting an unsuccessful job's output. Note that this is invoked for
+ * jobs with final runstate as {@link JobStatus.State#FAILED} or
+ * {@link JobStatus.State#KILLED}.
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @param state final runstate of the job
+ * @throws IOException
+ */
+ public void abortJob(JobContext jobContext, JobStatus.State state)
+ throws IOException {
+ cleanupJob(jobContext);
+ }
+
/**
* Sets up output for the task.
*
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Tue Oct 27 15:43:58 2009
@@ -21,6 +21,9 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
* The context passed to the {@link Reducer}.
* @param <KEYIN> the class of the input keys
@@ -28,6 +31,8 @@
* @param <KEYOUT> the class of the output keys
* @param <VALUEOUT> the class of the output values
*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public interface ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Tue Oct 27 15:43:58 2009
@@ -18,11 +18,15 @@
package org.apache.hadoop.mapreduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Progressable;
/**
* The context for task attempts.
*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public interface TaskAttemptContext extends JobContext, Progressable {
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Tue Oct 27 15:43:58 2009
@@ -20,6 +20,9 @@
import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
* A context object that allows input and output from the task. It is only
* supplied to the {@link Mapper} or {@link Reducer}.
@@ -28,6 +31,8 @@
* @param <KEYOUT> the output key type for the task
* @param <VALUEOUT> the output value type for the task
*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskAttemptContext {
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Oct 27 15:43:58 2009
@@ -199,7 +199,7 @@
Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
- baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+ baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
honorSymLinkConf);
}
@@ -250,7 +250,35 @@
@Deprecated
public static void releaseCache(URI cache, Configuration conf)
throws IOException {
- new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
+ // find the timestamp of the uri
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ String[] archivesTimestamps =
+ DistributedCache.getArchiveTimestamps(conf);
+ String[] filesTimestamps =
+ DistributedCache.getFileTimestamps(conf);
+ String timestamp = null;
+ if (archives != null) {
+ for (int i = 0; i < archives.length; i++) {
+ if (archives[i].equals(cache)) {
+ timestamp = archivesTimestamps[i];
+ break;
+ }
+ }
+ }
+ if (timestamp == null && files != null) {
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].equals(cache)) {
+ timestamp = filesTimestamps[i];
+ break;
+ }
+ }
+ }
+ if (timestamp == null) {
+ throw new IOException("TimeStamp of the uri couldnot be found");
+ }
+ new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
+ Long.parseLong(timestamp));
}
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Oct 27 15:43:58 2009
@@ -151,23 +151,9 @@
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
- String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
- String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
- // Get the local path if the cacheFile is already localized or create one
- // if it doesn't
- Path localPath;
- try {
- localPath = lDirAlloc.getLocalPathToRead(cachePath, taskConf);
- } catch (DiskErrorException de) {
- localPath =
- lDirAlloc.getLocalPathForWrite(cachePath, fileStatus.getLen(),
- taskConf);
- }
-
- String baseDir = localPath.toString().replace(cacheId, "");
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
- new Path(baseDir), fileStatus,
+ cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, workdirPath, false);
@@ -224,7 +210,7 @@
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
- distributedCacheManager.releaseCache(c.uri, taskConf);
+ distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
}
}