You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/10/01 01:15:01 UTC
svn commit: r700628 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/
Author: acmurthy
Date: Tue Sep 30 16:15:00 2008
New Revision: 700628
URL: http://svn.apache.org/viewvc?rev=700628&view=rev
Log:
HADOOP-4232. Fix race condition in JVM reuse when multiple slots become free. Contributed by Devaraj Das.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 30 16:15:00 2008
@@ -813,6 +813,9 @@
HADOOP-4309. Fix eclipse-plugin compilation. (cdouglas)
+ HADOOP-4232. Fix race condition in JVM reuse when multiple slots become
+ free. (ddas via acmurthy)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Tue Sep 30 16:15:00 2008
@@ -56,11 +56,10 @@
int port = Integer.parseInt(args[1]);
InetSocketAddress address = new InetSocketAddress(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+ final int SLEEP_LONGER_COUNT = 5;
taskid = firstTaskid;
int jvmIdInt = Integer.parseInt(args[3]);
JVMId jvmId = new JVMId(taskid.getJobID(),taskid.isMap(),jvmIdInt);
- final int MAX_SLEEP_COUNT = 600; //max idle time of 5 minutes
- int sleepCount = 0;
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
@@ -75,7 +74,9 @@
while (true) {
try {
Thread.sleep(5000);
- TaskLog.syncLogs(firstTaskid, taskid);
+ if (taskid != null) {
+ TaskLog.syncLogs(firstTaskid, taskid);
+ }
} catch (InterruptedException ie) {
} catch (IOException iee) {
LOG.error("Error in syncLogs: " + iee);
@@ -93,22 +94,26 @@
//manager to use JVMId instead of TaskAttemptId
Path srcPidPath = null;
Path dstPidPath = null;
+ int idleLoopCount = 0;
try {
while (true) {
- JvmTask myTask = umbilical.getTask(jvmId, firstTaskid);
+ JvmTask myTask = umbilical.getTask(jvmId);
if (myTask.shouldDie()) {
break;
} else {
if (myTask.getTask() == null) {
- if (sleepCount == MAX_SLEEP_COUNT) {
- System.exit(0);
+ taskid = null;
+ if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
+ //we sleep for a bigger interval when we don't receive
+ //tasks for a while
+ Thread.sleep(1500);
+ } else {
+ Thread.sleep(500);
}
- sleepCount++;
- Thread.sleep(500);
continue;
}
- sleepCount = 0; //got a task. reset the sleepCount
}
+ idleLoopCount = 0;
Task task = myTask.getTask();
taskid = task.getTaskID();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Sep 30 16:15:00 2008
@@ -48,10 +48,12 @@
* Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
* Version 17: Changed format of Task and TaskStatus for HADOOP-3150
* Version 18: Changed status message due to changes in TaskStatus
- * Changed heartbeat to piggyback JobTracker restart information
+ * Version 19: Changed heartbeat to piggyback JobTracker restart information
so that the TaskTracker can synchronize itself.
+ * Version 20: Changed status message due to changes in TaskStatus
+ * (HADOOP-4232)
*/
- public static final long versionID = 18L;
+ public static final long versionID = 20L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Tue Sep 30 16:15:00 2008
@@ -59,7 +59,7 @@
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
- public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) throws IOException {
+ public JvmTask getTask(JVMId jvmId) throws IOException {
return null;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 30 16:15:00 2008
@@ -1433,8 +1433,7 @@
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
- taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
- taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
+ taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskID());
}
@@ -1444,8 +1443,7 @@
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
- taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
- taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
+ taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskID());
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Tue Sep 30 16:15:00 2008
@@ -29,9 +29,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
class JvmManager {
@@ -72,22 +71,21 @@
}
}
- public void launchJvm(JobID jobId, boolean isMap, JvmEnv env) {
- if (isMap) {
- mapJvmManager.reapJvm(env, jobId, tracker);
+ public void launchJvm(TaskRunner t, JvmEnv env) {
+ if (t.getTask().isMapTask()) {
+ mapJvmManager.reapJvm(t, tracker, env);
} else {
- reduceJvmManager.reapJvm(env, jobId, tracker);
+ reduceJvmManager.reapJvm(t, tracker, env);
}
}
- public void setRunningTaskForJvm(JVMId jvmId, TaskRunner t) {
+ public TaskInProgress getTaskForJvm(JVMId jvmId) {
if (jvmId.isMapJVM()) {
- mapJvmManager.setRunningTaskForJvm(jvmId, t);
+ return mapJvmManager.getTaskForJvm(jvmId);
} else {
- reduceJvmManager.setRunningTaskForJvm(jvmId, t);
+ return reduceJvmManager.getTaskForJvm(jvmId);
}
}
-
public void taskFinished(TaskRunner tr) {
if (tr.getTask().isMapTask()) {
mapJvmManager.taskFinished(tr);
@@ -136,17 +134,18 @@
synchronized public void setRunningTaskForJvm(JVMId jvmId,
TaskRunner t) {
- if (t == null) {
- //signifies the JVM asked for a task and it
- //was not given anything.
- jvmIdToRunner.get(jvmId).setBusy(false);
- return;
- }
jvmToRunningTask.put(jvmId, t);
runningTaskToJvm.put(t,jvmId);
jvmIdToRunner.get(jvmId).setBusy(true);
}
+ synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
+ if (jvmToRunningTask.containsKey(jvmId)) {
+ return jvmToRunningTask.get(jvmId).getTaskInProgress();
+ }
+ return null;
+ }
+
synchronized public boolean isJvmknown(JVMId jvmId) {
return jvmIdToRunner.containsKey(jvmId);
}
@@ -187,23 +186,34 @@
jvmIdToRunner.remove(jvmId);
}
private synchronized void reapJvm(
- JvmEnv env,
- JobID jobId, TaskTracker tracker) {
+ TaskRunner t, TaskTracker tracker, JvmEnv env) {
boolean spawnNewJvm = false;
+ JobID jobId = t.getTask().getJobID();
//Check whether there is a free slot to start a new JVM.
//,or, Kill a (idle) JVM and launch a new one
+ //When this method is called, we *must*
+ // (1) spawn a new JVM (if we are below the max)
+ // (2) find an idle JVM (that belongs to the same job), or,
+ // (3) kill an idle JVM (from a different job)
+ // (the order of return is in the order above)
int numJvmsSpawned = jvmIdToRunner.size();
-
+ JvmRunner runnerToKill = null;
if (numJvmsSpawned >= maxJvms) {
//go through the list of JVMs for all jobs.
- //for each JVM see whether it is currently running something and
- //if not, then kill the JVM
Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =
jvmIdToRunner.entrySet().iterator();
while (jvmIter.hasNext()) {
JvmRunner jvmRunner = jvmIter.next().getValue();
JobID jId = jvmRunner.jvmId.getJobId();
+ //look for a free JVM for this job; if one exists then just break
+ if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
+ setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
+ LOG.info("No new JVM spawned for jobId/taskid: " +
+ jobId+"/"+t.getTask().getTaskID() +
+ ". Attempting to reuse: " + jvmRunner.jvmId);
+ return;
+ }
//Cases when a JVM is killed:
// (1) the JVM under consideration belongs to the same job
// (passed in the argument). In this case, kill only when
@@ -211,13 +221,12 @@
// of count).
// (2) the JVM under consideration belongs to a different job and is
// currently not busy
- //
+ //But in both the above cases, we see if we can assign the current
+ //task to an idle JVM (hence we continue the loop even on a match)
if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
(!jId.equals(jobId) && !jvmRunner.isBusy())) {
- jvmIter.remove();
- jvmRunner.kill();
+ runnerToKill = jvmRunner;
spawnNewJvm = true;
- break;
}
}
} else {
@@ -225,13 +234,42 @@
}
if (spawnNewJvm) {
- spawnNewJvm(jobId, env, tracker);
- } else {
- LOG.info("No new JVM spawned for jobId: " + jobId);
+ if (runnerToKill != null) {
+ LOG.info("Killing JVM: " + runnerToKill.jvmId);
+ runnerToKill.kill();
+ }
+ spawnNewJvm(jobId, env, tracker, t);
+ return;
}
+ //*MUST* never reach this
+ throw new RuntimeException("Inconsistent state!!! " +
+ "JVM Manager reached an unstable state " +
+ "while reaping a JVM for task: " + t.getTask().getTaskID()+
+ " " + getDetails());
+ }
+
+ private String getDetails() {
+ StringBuffer details = new StringBuffer();
+ details.append("Number of active JVMs:").
+ append(jvmIdToRunner.size());
+ Iterator<JVMId> jvmIter =
+ jvmIdToRunner.keySet().iterator();
+ while (jvmIter.hasNext()) {
+ JVMId jvmId = jvmIter.next();
+ details.append("\n JVMId ").
+ append(jvmId.toString()).
+ append(" #Tasks ran: ").
+ append(jvmIdToRunner.get(jvmId).numTasksRan).
+ append(" Currently busy? ").
+ append(jvmIdToRunner.get(jvmId).busy).
+ append(" Currently running: ").
+ append(jvmToRunningTask.get(jvmId).getTask().getTaskID().toString());
+ }
+ return details.toString();
}
- private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker) {
+ private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker,
+ TaskRunner t) {
JvmRunner jvmRunner = new JvmRunner(env,jobId);
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
//spawn the JVM in a new thread. Note that there will be very little
@@ -247,6 +285,7 @@
TaskAttemptID.forName(env.conf.get("mapred.task.id")),
tracker.getMemoryForTask(env.conf));
}
+ setRunningTaskForJvm(jvmRunner.jvmId, t);
LOG.info(jvmRunner.getName());
jvmRunner.start();
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Tue Sep 30 16:15:00 2008
@@ -207,7 +207,7 @@
// TaskUmbilicalProtocol methods
- public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) { return null; }
+ public JvmTask getTask(JVMId jvmId) { return null; }
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Sep 30 16:15:00 2008
@@ -111,8 +111,9 @@
}
@Override
- public TaskRunner createRunner(TaskTracker tracker) {
- return new MapTaskRunner(this, tracker, this.conf);
+ public TaskRunner createRunner(TaskTracker tracker,
+ TaskTracker.TaskInProgress tip) {
+ return new MapTaskRunner(tip, tracker, this.conf);
}
@Override
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Tue Sep 30 16:15:00 2008
@@ -19,10 +19,12 @@
import java.io.*;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
/** Runs a map task. */
class MapTaskRunner extends TaskRunner {
- public MapTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
+ public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
super(task, tracker, conf);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 30 16:15:00 2008
@@ -70,6 +70,7 @@
import org.apache.hadoop.mapred.IFile.*;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -161,8 +162,9 @@
}
@Override
- public TaskRunner createRunner(TaskTracker tracker) throws IOException {
- return new ReduceTaskRunner(this, tracker, this.conf);
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
+ throws IOException {
+ return new ReduceTaskRunner(tip, tracker, this.conf);
}
@Override
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Sep 30 16:15:00 2008
@@ -19,10 +19,12 @@
import java.io.*;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
/** Runs a reduce task. */
class ReduceTaskRunner extends TaskRunner {
- public ReduceTaskRunner(Task task, TaskTracker tracker,
+ public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker,
JobConf conf) throws IOException {
super(task, tracker, conf);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Sep 30 16:15:00 2008
@@ -303,9 +303,10 @@
throws IOException;
- /** Return an approprate thread runner for this task. */
- public abstract TaskRunner createRunner(TaskTracker tracker
- ) throws IOException;
+ /** Return an approprate thread runner for this task.
+ * @param tip TODO*/
+ public abstract TaskRunner createRunner(TaskTracker tracker,
+ TaskTracker.TaskInProgress tip) throws IOException;
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Tue Sep 30 16:15:00 2008
@@ -486,7 +486,6 @@
// @see {@link TaskTracker.transmitHeartbeat()}
if ((newState != TaskStatus.State.RUNNING &&
newState != TaskStatus.State.COMMIT_PENDING &&
- newState != TaskStatus.State.INITIALIZED &&
newState != TaskStatus.State.UNASSIGNED) &&
(oldState == newState)) {
LOG.warn("Recieved duplicate status update of '" + newState +
@@ -499,8 +498,7 @@
// to running. This is a spot fix, but it should be addressed more
// globally.
if ((newState == TaskStatus.State.RUNNING ||
- newState == TaskStatus.State.UNASSIGNED ||
- newState == TaskStatus.State.INITIALIZED) &&
+ newState == TaskStatus.State.UNASSIGNED) &&
(oldState == TaskStatus.State.FAILED ||
oldState == TaskStatus.State.KILLED ||
oldState == TaskStatus.State.SUCCEEDED ||
@@ -713,7 +711,6 @@
TaskStatus st = taskStatuses.get(taskId);
if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
|| st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
- st.getRunState() == TaskStatus.State.INITIALIZED ||
st.getRunState() == TaskStatus.State.UNASSIGNED)
&& tasksToKill.put(taskId, shouldFail) == null ) {
String logStr = "Request received to " + (shouldFail ? "fail" : "kill")
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue Sep 30 16:15:00 2008
@@ -42,6 +42,7 @@
LogFactory.getLog(TaskRunner.class);
volatile boolean killed = false;
+ private TaskTracker.TaskInProgress tip;
private Task t;
private Object lock = new Object();
private volatile boolean done = false;
@@ -58,8 +59,10 @@
*/
protected MapOutputFile mapOutputFile;
- public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
- this.t = t;
+ public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
+ JobConf conf) {
+ this.tip = tip;
+ this.t = tip.getTask();
this.tracker = tracker;
this.conf = conf;
this.mapOutputFile = new MapOutputFile(t.getJobID());
@@ -68,6 +71,7 @@
}
public Task getTask() { return t; }
+ public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
public TaskTracker getTracker() { return tracker; }
/** Called to assemble this task's input. This method is run in the parent
@@ -403,9 +407,7 @@
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
- tracker.taskInitialized(t.getTaskID());
- LOG.info("Task ID: " + t.getTaskID() +" initialized");
- jvmManager.launchJvm(t.getJobID(), t.isMapTask(),
+ jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
workDir, env, pidFile, conf));
synchronized (lock) {
@@ -537,6 +539,7 @@
public void kill() {
killed = true;
jvmManager.taskKilled(this);
+ signalDone();
}
public void signalDone() {
synchronized (lock) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Tue Sep 30 16:15:00 2008
@@ -41,7 +41,7 @@
// what state is the task in?
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING, INITIALIZED}
+ COMMIT_PENDING}
private TaskAttemptID taskid;
private float progress;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 30 16:15:00 2008
@@ -72,6 +72,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.JobClient.TaskStatusFilter;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.metrics.MetricsContext;
@@ -1186,7 +1187,6 @@
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
- taskStatus.getRunState() != TaskStatus.State.INITIALIZED &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
if (taskStatus.getIsMap()) {
mapTotal--;
@@ -1250,7 +1250,6 @@
// still occupied and hence memory of the task should be
// accounted in used memory.
if ((tip.getRunState() == TaskStatus.State.RUNNING)
- || (tip.getRunState() == TaskStatus.State.INITIALIZED)
|| (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
maxMemoryUsed += getMemoryForTask(tip.getJobConf());
}
@@ -1304,7 +1303,6 @@
long now = System.currentTimeMillis();
for (TaskInProgress tip: runningTasks.values()) {
if (tip.getRunState() == TaskStatus.State.RUNNING ||
- tip.getRunState() == TaskStatus.State.INITIALIZED ||
tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
// Check the per-job timeout interval for tasks;
// an interval of '0' implies it is never timed-out
@@ -1611,7 +1609,15 @@
numFreeSlots.set(numFreeSlots.get() - 1);
assert (numFreeSlots.get() >= 0);
}
-
+ synchronized (tip) {
+ //to make sure that there is no kill task action for this
+ if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+ //got killed externally while still in the launcher queue
+ addFreeSlot();
+ continue;
+ }
+ tip.slotTaken = true;
+ }
//got a free slot. launch the task
startNewTask(tip);
} catch (InterruptedException e) {
@@ -1740,6 +1746,7 @@
private TaskStatus taskStatus;
private long taskTimeout;
private String debugCommand;
+ private volatile boolean slotTaken = false;
/**
*/
@@ -1833,7 +1840,7 @@
alwaysKeepTaskFiles = false;
}
if (debugCommand != null || localJobConf.getProfileEnabled() ||
- alwaysKeepTaskFiles) {
+ alwaysKeepTaskFiles || keepFailedTaskFiles) {
//disable jvm reuse
localJobConf.setNumTasksToExecutePerJvm(1);
}
@@ -1885,9 +1892,16 @@
* Kick off the task execution
*/
public synchronized void launchTask() throws IOException {
- localizeTask(task);
- this.runner = task.createRunner(TaskTracker.this);
- this.runner.start();
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ localizeTask(task);
+ this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ this.runner = task.createRunner(TaskTracker.this, this);
+ this.runner.start();
+ this.taskStatus.setStartTime(System.currentTimeMillis());
+ } else {
+ LOG.info("Not launching task: " + task.getTaskID() +
+ " since it's state is " + this.taskStatus.getRunState());
+ }
}
/**
@@ -2095,13 +2109,6 @@
}
- synchronized void taskInitialized() {
- if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
- //one-way state change to INITIALIZED
- this.taskStatus.setRunState(TaskStatus.State.INITIALIZED);
- }
- }
-
/**
* Runs the script given in args
@@ -2193,7 +2200,6 @@
synchronized(this){
if (getRunState() == TaskStatus.State.RUNNING ||
getRunState() == TaskStatus.State.UNASSIGNED ||
- getRunState() == TaskStatus.State.INITIALIZED ||
getRunState() == TaskStatus.State.COMMIT_PENDING) {
kill(wasFailure);
}
@@ -2209,12 +2215,12 @@
*/
public synchronized void kill(boolean wasFailure) throws IOException {
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
- taskStatus.getRunState() == TaskStatus.State.INITIALIZED ||
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
wasKilled = true;
if (wasFailure) {
failures += 1;
}
+ runner.kill();
taskStatus.setRunState((wasFailure) ?
TaskStatus.State.FAILED :
TaskStatus.State.KILLED);
@@ -2225,15 +2231,12 @@
} else {
taskStatus.setRunState(TaskStatus.State.KILLED);
}
- }
- if (runner != null) {
- runner.kill();
- runner.signalDone();
- } else {
- if (task.isMapTask()) {
- addFreeMapSlot();
- } else {
- addFreeReduceSlot();
+ if (slotTaken) {
+ if (task.isMapTask()) {
+ addFreeMapSlot();
+ } else {
+ addFreeReduceSlot();
+ }
}
}
}
@@ -2344,7 +2347,7 @@
/**
* Called upon startup by the child process, to fetch Task data.
*/
- public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId)
+ public synchronized JvmTask getTask(JVMId jvmId)
throws IOException {
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
if (!jvmManager.isJvmKnown(jvmId)) {
@@ -2353,41 +2356,24 @@
}
RunningJob rjob = runningJobs.get(jvmId.getJobId());
if (rjob == null) { //kill the JVM since the job is dead
+ LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
+ " is dead");
jvmManager.killJvm(jvmId);
return new JvmTask(null, true);
}
- TaskInProgress t = runningTasks.get(firstTaskId);
- //if we can give the JVM the task it is asking for, well and good;
- //if not, we give it some other task from the same job (note that some
- //other JVM might have run this task while this JVM was init'ing)
- if (t == null || t.getStatus().getRunState() !=
- TaskStatus.State.INITIALIZED) {
- boolean isMap = jvmId.isMapJVM();
- synchronized (rjob) {
- for (TaskInProgress tip : runningTasks.values()) {
- synchronized (tip) {
- if (tip.getTask().getJobID().equals(jvmId.getJobId()) &&
- tip.getRunState() == TaskStatus.State.INITIALIZED
- && ((isMap && tip.getTask().isMapTask()) ||
- (!isMap && !tip.getTask().isMapTask()))) {
- t = tip;
- }
- }
- }
- }
- }
- //now the task could be null or we could have got a task that already
- //ran earlier (the firstTaskId case)
- if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) {
- jvmManager.setRunningTaskForJvm(jvmId, null);
+ TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
+ if (tip == null) {
return new JvmTask(null, false);
}
- t.getStatus().setRunState(TaskStatus.State.RUNNING);
- t.getStatus().setStartTime(System.currentTimeMillis());
- jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner());
- LOG.info("JVM with ID: " + jvmId + " given task: " +
- t.getTask().getTaskID().toString());
- return new JvmTask(t.getTask(), false);
+ if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present
+ LOG.info("JVM with ID: " + jvmId + " given task: " +
+ tip.getTask().getTaskID());
+ return new JvmTask(tip.getTask(), false);
+ } else {
+ LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " +
+ tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState());
+ return new JvmTask(null, true);
+ }
}
/**
@@ -2543,12 +2529,6 @@
}
}
- synchronized void taskInitialized(TaskAttemptID taskid) {
- TaskInProgress tip = tasks.get(taskid);
- if (tip != null) {
- tip.taskInitialized();
- }
- }
/**
* A completed map task's output has been lost.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Sep 30 16:15:00 2008
@@ -207,8 +207,7 @@
TaskStatus.State state = ts.getRunState();
if (ts.getIsMap() &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED) ||
- (state == TaskStatus.State.INITIALIZED))) {
+ (state == TaskStatus.State.UNASSIGNED))) {
mapCount++;
}
}
@@ -225,8 +224,7 @@
TaskStatus.State state = ts.getRunState();
if ((!ts.getIsMap()) &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED) ||
- (state == TaskStatus.State.INITIALIZED))) {
+ (state == TaskStatus.State.UNASSIGNED))) {
reduceCount++;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Sep 30 16:15:00 2008
@@ -51,18 +51,18 @@
* stale or not. Hence the return type is a class that
* encapsulates the events and whether to reset events index.
* Version 13 changed the getTask method signature for HADOOP-249
+ * Version 14 changed the getTask method signature for HADOOP-4232
* */
- public static final long versionID = 13L;
+ public static final long versionID = 14L;
/**
* Called when a child task process starts, to get its task.
* @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
- * @param taskid the first taskid that the JVM runs
* @return Task object
* @throws IOException
*/
- JvmTask getTask(JVMId jvmId, TaskAttemptID taskid) throws IOException;
+ JvmTask getTask(JVMId jvmId) throws IOException;
/**
* Report child's progress to parent.