You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/29 22:04:18 UTC
svn commit: r501182 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Mon Jan 29 13:04:17 2007
New Revision: 501182
URL: http://svn.apache.org/viewvc?view=rev&rev=501182
Log:
HADOOP-949. svn merge -r 500410:500397, reverting a patch from HADOOP-248, which was causing MapReduce programs to hang.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jan 29 13:04:17 2007
@@ -87,8 +87,7 @@
with different versions of Lucene without worrying about CLASSPATH
order. (Milind Bhandarkar via cutting)
-27. HADOOP-248. Optimize location of map outputs to no longer use
- random probes. (Devaraj Das via cutting)
+27. [ intentionally blank ]
28. HADOOP-227. Add support for backup namenodes, which periodically
get snapshots of the namenode state. (Dhruba Borthakur via cutting)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon Jan 29 13:04:17 2007
@@ -28,10 +28,11 @@
*/
interface InterTrackerProtocol extends VersionedProtocol {
/**
- * version 4 introduced that removes locateMapOutputs and instead uses
- * getTaskCompletionEvents to figure finished maps and fetch the outputs
+ * version 3 introduced to replace
+ * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
+ * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
*/
- public static final long versionID = 4L;
+ public static final long versionID = 3L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
@@ -61,6 +62,18 @@
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException;
+ /** Called by a reduce task to find which map tasks are completed.
+ *
+ * @param jobId the job id
+ * @param mapTasksNeeded an array of the mapIds that we need
+ * @param partition the reduce's id
+ * @return an array of MapOutputLocation
+ */
+ MapOutputLocation[] locateMapOutputs(String jobId,
+ int[] mapTasksNeeded,
+ int partition
+ ) throws IOException;
+
/**
* The task tracker calls this once, to discern where it can find
* files referred to by the JobTracker
@@ -83,12 +96,11 @@
* Returns empty aray if no events are available.
* @param jobid job id
* @param fromEventId event id to start from.
- * @param maxEvents the max number of events we want to look at
* @return array of task completion events.
* @throws IOException
*/
TaskCompletionEvent[] getTaskCompletionEvents(
- String jobid, int fromEventId, int maxEvents) throws IOException;
+ String jobid, int fromEventId) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Jan 29 13:04:17 2007
@@ -157,7 +157,7 @@
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
int startFrom) throws IOException{
return jobSubmitClient.getTaskCompletionEvents(
- getJobID(), startFrom, 10);
+ getJobID(), startFrom);
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jan 29 13:04:17 2007
@@ -291,40 +291,24 @@
if (state == TaskStatus.State.SUCCEEDED) {
this.taskCompletionEvents.add( new TaskCompletionEvent(
- taskCompletionEventTracker,
- status.getTaskId(),
- tip.idWithinJob(),
- status.getIsMap(),
+ taskCompletionEventTracker++,
+ status.getTaskId(),
TaskCompletionEvent.Status.SUCCEEDED,
httpTaskLogLocation ));
- tip.setSuccessEventNumber(taskCompletionEventTracker);
completedTask(tip, status, metrics);
} else if (state == TaskStatus.State.FAILED ||
state == TaskStatus.State.KILLED) {
this.taskCompletionEvents.add( new TaskCompletionEvent(
- taskCompletionEventTracker,
- status.getTaskId(),
- tip.idWithinJob(),
- status.getIsMap(),
+ taskCompletionEventTracker++,
+ status.getTaskId(),
TaskCompletionEvent.Status.FAILED,
httpTaskLogLocation ));
- // Get the event number for the (possibly) previously successful
- // task. If there exists one, then set that status to OBSOLETE
- int eventNumber;
- if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
- TaskCompletionEvent t =
- this.taskCompletionEvents.get(eventNumber);
- if (t.getTaskId().equals(status.getTaskId()))
- t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
- }
// Tell the job to fail the relevant task
failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
wasRunning, wasComplete);
}
}
- taskCompletionEventTracker++;
-
//
// Update JobInProgress status
//
@@ -794,14 +778,12 @@
return null;
}
- public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId,
- int maxEvents) {
+ public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
if( taskCompletionEvents.size() > fromEventId) {
- int actualMax = Math.min(maxEvents,
- (taskCompletionEvents.size() - fromEventId));
events = (TaskCompletionEvent[])taskCompletionEvents.subList(
- fromEventId, actualMax + fromEventId).toArray(events);
+ fromEventId, taskCompletionEvents.size()).
+ toArray(events);
}
return events;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Mon Jan 29 13:04:17 2007
@@ -85,12 +85,11 @@
* Get task completion events for the jobid, starting from fromEventId.
* Returns empty aray if no events are available.
* @param jobid job id
- * @param fromEventId event id to start from.
- * @param maxEvents the max number of events we want to look at
+ * @param fromEventId event id to start from.
* @return array of task completion events.
* @throws IOException
*/
public TaskCompletionEvent[] getTaskCompletionEvents(
- String jobid, int fromEventId, int maxEvents) throws IOException;
+ String jobid, int fromEventId) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jan 29 13:04:17 2007
@@ -1263,6 +1263,48 @@
}
/**
+ * A TaskTracker wants to know the physical locations of completed, but not
+ * yet closed, tasks. This exists so the reduce task thread can locate
+ * map task outputs.
+ */
+ public synchronized MapOutputLocation[]
+ locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce)
+ throws IOException {
+ // Check to make sure that the job hasn't 'completed'.
+ JobInProgress job = getJob(jobId);
+ if (job.status.getRunState() != JobStatus.RUNNING) {
+ return new MapOutputLocation[0];
+ }
+
+ ArrayList result = new ArrayList(mapTasksNeeded.length);
+ for (int i = 0; i < mapTasksNeeded.length; i++) {
+ TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
+ if (status != null) {
+ String trackerId =
+ (String) taskidToTrackerMap.get(status.getTaskId());
+ // Safety check, if we can't find the taskid in
+ // taskidToTrackerMap and job isn't 'running', then just
+ // return an empty array
+ if (trackerId == null &&
+ job.status.getRunState() != JobStatus.RUNNING) {
+ return new MapOutputLocation[0];
+ }
+
+ TaskTrackerStatus tracker;
+ synchronized (taskTrackers) {
+ tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
+ }
+ result.add(new MapOutputLocation(status.getTaskId(),
+ mapTasksNeeded[i],
+ tracker.getHost(),
+ tracker.getHttpPort()));
+ }
+ }
+ return (MapOutputLocation[])
+ result.toArray(new MapOutputLocation[result.size()]);
+ }
+
+ /**
* Grab the local fs name
*/
public synchronized String getFilesystemName() throws IOException {
@@ -1393,14 +1435,14 @@
/*
* Returns a list of TaskCompletionEvent for the given job,
* starting from fromEventId.
- * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
+ * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int)
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
- String jobid, int fromEventId, int maxEvents) throws IOException{
+ String jobid, int fromEventId) throws IOException{
TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
JobInProgress job = (JobInProgress)this.jobs.get(jobid);
if (null != job) {
- events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+ events = job.getTaskCompletionEvents(fromEventId);
}
return events;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Jan 29 13:04:17 2007
@@ -244,7 +244,7 @@
public JobStatus[] jobsToComplete() {return null;}
public TaskCompletionEvent[] getTaskCompletionEvents(
- String jobid, int fromEventId, int maxEvents) throws IOException{
+ String jobid, int fromEventId) throws IOException{
return TaskCompletionEvent.EMPTY_ARRAY;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Jan 29 13:04:17 2007
@@ -122,19 +122,9 @@
/**
* the number of map output locations to poll for at one time
- */
- private int probe_sample_size = 50;
-
- /**
- * a Random used during the map output fetching
- */
- private Random randForProbing;
-
- /**
- * a hashmap from mapId to MapOutputLocation for retrials
*/
- private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
-
+ private static final int PROBE_SAMPLE_SIZE = 50;
+
/** Represents the result of an attempt to copy a map output */
private class CopyResult {
@@ -378,10 +368,6 @@
Random backoff = new Random();
final Progress copyPhase = getTask().getProgress().phase();
- //tweak the probe sample size (make it a function of numCopiers)
- probe_sample_size = Math.max(numCopiers*5, 50);
- randForProbing = new Random(reduceTask.getPartition() * 100);
-
for (int i = 0; i < numOutputs; i++) {
neededOutputs.add(new Integer(i));
copyPhase.addPhase(); // add sub-phase per file
@@ -399,8 +385,6 @@
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
- int fromEventId = 0;
-
PingTimer pingTimer = new PingTimer();
pingTimer.setName("Map output copy reporter for task " +
reduceTask.getTaskId());
@@ -417,36 +401,17 @@
LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
" map output location(s)");
try {
- // the call to queryJobTracker will modify fromEventId to a value
- // that it should be for the next call to queryJobTracker
- MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient);
+ MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
// remove discovered outputs from needed list
// and put them on the known list
- int gotLocs = (locs == null ? 0 : locs.length);
for (int i=0; i < locs.length; i++) {
- // check whether we actually need an output. It could happen
- // that a map task that successfully ran earlier got lost, but
- // if we already have copied the output of that unfortunate task
- // we need not copy it again from the new TT (we will ignore
- // the event for the new rescheduled execution)
- if(neededOutputs.remove(new Integer(locs[i].getMapId()))) {
- // remove the mapId from the retryFetches hashmap since we now
- // prefer the new location instead of what we saved earlier
- retryFetches.remove(new Integer(locs[i].getMapId()));
- knownOutputs.add(locs[i]);
- gotLocs--;
- }
-
+ neededOutputs.remove(new Integer(locs[i].getMapId()));
+ knownOutputs.add(locs[i]);
}
- // now put the remaining hash entries for the failed fetches
- // and clear the hashmap
- knownOutputs.addAll(retryFetches.values());
LOG.info(reduceTask.getTaskId() +
- " Got " + gotLocs +
- " new map outputs from jobtracker and " + retryFetches.size() +
- " map outputs from previous failures");
- retryFetches.clear();
+ " Got " + (locs == null ? 0 : locs.length) +
+ " map outputs from jobtracker");
}
catch (IOException ie) {
LOG.warn(reduceTask.getTaskId() +
@@ -520,7 +485,6 @@
} else {
// this copy failed, put it back onto neededOutputs
neededOutputs.add(new Integer(cr.getMapId()));
- retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
// wait a random amount of time for next contact
currentTime = System.currentTimeMillis();
@@ -540,7 +504,6 @@
while (locIt.hasNext()) {
MapOutputLocation loc = (MapOutputLocation)locIt.next();
if (cr.getHost().equals(loc.getHost())) {
- retryFetches.put(new Integer(loc.getMapId()), loc);
locIt.remove();
neededOutputs.add(new Integer(loc.getMapId()));
}
@@ -551,7 +514,7 @@
}
// ensure we have enough to keep us busy
- if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
+ if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
break;
}
}
@@ -645,16 +608,28 @@
}
/** Queries the job tracker for a set of outputs ready to be copied
- * @param fromEventId the first event ID we want to start from, this will be
- * modified by the call to this method
+ * @param neededOutputs the list of currently unknown outputs
* @param jobClient the job tracker
* @return a set of locations to copy outputs from
* @throws IOException
*/
- private MapOutputLocation[] queryJobTracker(int fromEventId,
+ private MapOutputLocation[] queryJobTracker(List neededOutputs,
InterTrackerProtocol jobClient)
throws IOException {
+ // query for a just a random subset of needed segments so that we don't
+ // overwhelm jobtracker. ideally perhaps we could send a more compact
+ // representation of all needed, i.e., a bit-vector
+ int checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
+ int neededIds[] = new int[checkSize];
+
+ Collections.shuffle(neededOutputs);
+
+ ListIterator itr = neededOutputs.listIterator();
+ for (int i=0; i < checkSize; i++) {
+ neededIds[i] = ((Integer)itr.next()).intValue();
+ }
+
long currentTime = System.currentTimeMillis();
long pollTime = lastPollTime + MIN_POLL_INTERVAL;
while (currentTime < pollTime) {
@@ -665,28 +640,9 @@
}
lastPollTime = currentTime;
- TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
- reduceTask.getJobId().toString(),
- fromEventId,
- probe_sample_size);
-
- List <MapOutputLocation> mapOutputsList = new ArrayList();
- for (int i = 0; i < t.length; i++) {
- if (t[i].isMap &&
- t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
- URI u = URI.create(t[i].getTaskTrackerHttp());
- String host = u.getHost();
- int port = u.getPort();
- String taskId = t[i].getTaskId();
- int mId = t[i].idWithinJob();
- mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
- }
- }
- Collections.shuffle(mapOutputsList, randForProbing);
- MapOutputLocation[] locations =
- new MapOutputLocation[mapOutputsList.size()];
- fromEventId += t.length;
- return mapOutputsList.toArray(locations);
+ return jobClient.locateMapOutputs(reduceTask.getJobId().toString(),
+ neededIds,
+ reduceTask.getPartition());
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Mon Jan 29 13:04:17 2007
@@ -12,14 +12,12 @@
*
*/
public class TaskCompletionEvent implements Writable{
- static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
+ static public enum Status {FAILED, SUCCEEDED};
private int eventId ;
private String taskTrackerHttp ;
private String taskId ;
Status status ;
- boolean isMap = false ;
- private int idWithinJob;
public static final TaskCompletionEvent[] EMPTY_ARRAY =
new TaskCompletionEvent[0];
/**
@@ -37,15 +35,11 @@
* @param taskTrackerHttp task tracker's host:port for http.
*/
public TaskCompletionEvent(int eventId,
- String taskId,
- int idWithinJob,
- boolean isMap,
+ String taskId,
Status status,
String taskTrackerHttp){
- this.taskId = taskId ;
- this.idWithinJob = idWithinJob ;
- this.isMap = isMap ;
+ this.taskId = taskId ;
this.eventId = eventId ;
this.status =status ;
this.taskTrackerHttp = taskTrackerHttp ;
@@ -120,28 +114,17 @@
return buf.toString();
}
- public boolean isMapTask() {
- return isMap;
- }
-
- public int idWithinJob() {
- return idWithinJob;
- }
//////////////////////////////////////////////
// Writable
//////////////////////////////////////////////
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, taskId);
- WritableUtils.writeVInt(out, idWithinJob);
- out.writeBoolean(isMap);
WritableUtils.writeEnum(out, status);
WritableUtils.writeString(out, taskTrackerHttp);
}
public void readFields(DataInput in) throws IOException {
this.taskId = WritableUtils.readString(in) ;
- this.idWithinJob = WritableUtils.readVInt(in);
- this.isMap = in.readBoolean();
this.status = WritableUtils.readEnum(in, Status.class);
this.taskTrackerHttp = WritableUtils.readString(in);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=501182&r1=501181&r2=501182
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Jan 29 13:04:17 2007
@@ -60,7 +60,6 @@
private JobInProgress job;
// Status of the TIP
- private int successEventNumber = -1;
private int numTaskFailures = 0;
private double progress = 0;
private String state = "";
@@ -140,15 +139,6 @@
}
/**
- * Return the index of the tip within the job, so "tip_0002_m_012345"
- * would return 12345;
- * @return int the tip index
- */
- public int idWithinJob() {
- return partition;
- }
-
- /**
* Initialization common to Map and Reduce
*/
void init(String jobUniqueString) {
@@ -577,19 +567,5 @@
*/
public int getIdWithinJob() {
return partition;
- }
-
- /**
- * Set the event number that was raised for this tip
- */
- public void setSuccessEventNumber(int eventNumber) {
- successEventNumber = eventNumber;
- }
-
- /**
- * Get the event number that was raised for this tip
- */
- public int getSuccessEventNumber() {
- return successEventNumber;
}
}