You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/09/19 09:31:42 UTC
svn commit: r696957 [2/2] - in /hadoop/core/trunk: ./ conf/
src/core/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 19 00:31:41 2008
@@ -18,6 +18,9 @@
package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -32,11 +35,13 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import java.util.LinkedHashMap;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +67,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -184,6 +190,8 @@
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
private CleanupQueue directoryCleanupThread;
+ volatile JvmManager jvmManager;
+
private TaskMemoryManagerThread taskMemoryManager;
private boolean taskMemoryManagerEnabled = false;
private long maxVirtualMemoryForTasks
@@ -389,7 +397,7 @@
// Clear out state tables
this.tasks.clear();
- this.runningTasks = new TreeMap<TaskAttemptID, TaskInProgress>();
+ this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
this.runningJobs = new TreeMap<JobID, RunningJob>();
this.mapTotal = 0;
this.reduceTotal = 0;
@@ -422,6 +430,8 @@
InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
String bindAddress = socAddr.getHostName();
int tmpPort = socAddr.getPort();
+
+ this.jvmManager = new JvmManager(this);
// RPC initialization
int max = maxCurrentMapTasks > maxCurrentReduceTasks ?
@@ -472,6 +482,10 @@
taskMemoryManager.setDaemon(true);
taskMemoryManager.start();
}
+ mapLauncher = new TaskLauncher(maxCurrentMapTasks);
+ reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
+ mapLauncher.start();
+ reduceLauncher.start();
this.running = true;
}
@@ -675,7 +689,6 @@
private void localizeJob(TaskInProgress tip) throws IOException {
Path localJarFile = null;
Task t = tip.getTask();
-
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
// Get sizes of JobFile and JarFile
@@ -840,6 +853,14 @@
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
+ //stop the launchers
+ mapLauncher.cleanTaskQueue();
+ reduceLauncher.cleanTaskQueue();
+ this.mapLauncher.interrupt();
+ this.reduceLauncher.interrupt();
+
+ jvmManager.stop();
+
// shutdown RPC connections
RPC.stopProxy(jobClient);
}
@@ -1037,7 +1058,7 @@
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
- startNewTask((LaunchTaskAction) action);
+ addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction) {
CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
@@ -1130,8 +1151,8 @@
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
- askForNewTask = (mapTotal < maxCurrentMapTasks ||
- reduceTotal < maxCurrentReduceTasks) &&
+ askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
+ status.countReduceTasks() < maxCurrentReduceTasks) &&
acceptNewTasks;
localMinSpaceStart = minSpaceStart;
}
@@ -1161,6 +1182,8 @@
synchronized (this) {
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+ taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+ taskStatus.getRunState() != TaskStatus.State.INITIALIZED &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
if (taskStatus.getIsMap()) {
mapTotal--;
@@ -1192,7 +1215,7 @@
/**
* Return the maximum amount of memory available for all tasks on
* this tracker
- * @return maximum amount of virtual memory in kilobytes
+ * @return maximum amount of virtual memory
*/
long getMaxVirtualMemoryForTasks() {
return maxVirtualMemoryForTasks;
@@ -1208,7 +1231,7 @@
* and the total amount of maximum virtual memory that can be
* used by all currently running tasks.
*
- * @return amount of free virtual memory in kilobytes that can be assured for
+ * @return amount of free virtual memory that can be assured for
* new tasks
*/
private synchronized long findFreeVirtualMemory() {
@@ -1224,9 +1247,9 @@
// still occupied and hence memory of the task should be
// accounted in used memory.
if ((tip.getRunState() == TaskStatus.State.RUNNING)
- || (tip.getRunState() == TaskStatus.State.UNASSIGNED)
+ || (tip.getRunState() == TaskStatus.State.INITIALIZED)
|| (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
- maxMemoryUsed += getMemoryForTask(tip);
+ maxMemoryUsed += getMemoryForTask(tip.getJobConf());
}
}
@@ -1239,11 +1262,11 @@
* If the TIP's job has a configured value for the max memory that is
* returned. Else, the default memory that would be assigned for the
* task is returned.
- * @param tip The TaskInProgress
+ * @param conf
* @return the memory allocated for the TIP.
*/
- private long getMemoryForTask(TaskInProgress tip) {
- long memForTask = tip.getJobConf().getMaxVirtualMemoryForTask();
+ public long getMemoryForTask(JobConf conf) {
+ long memForTask = conf.getMaxVirtualMemoryForTask();
if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
memForTask = this.getDefaultMemoryPerTask();
}
@@ -1278,6 +1301,7 @@
long now = System.currentTimeMillis();
for (TaskInProgress tip: runningTasks.values()) {
if (tip.getRunState() == TaskStatus.State.RUNNING ||
+ tip.getRunState() == TaskStatus.State.INITIALIZED ||
tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
// Check the per-job timeout interval for tasks;
// an interval of '0' implies it is never timed-out
@@ -1497,15 +1521,101 @@
return -1;
}
}
+
+ private TaskLauncher mapLauncher;
+ private TaskLauncher reduceLauncher;
+
+ public JvmManager getJvmManagerInstance() {
+ return jvmManager;
+ }
+
+ public void addFreeMapSlot() {
+ mapLauncher.addFreeSlot();
+ }
- /**
- * Start a new task.
- * All exceptions are handled locally, so that we don't mess up the
- * task tracker.
- */
- private void startNewTask(LaunchTaskAction action) {
+ public void addFreeReduceSlot() {
+ reduceLauncher.addFreeSlot();
+ }
+
+ private void addToTaskQueue(LaunchTaskAction action) {
+ if (action.getTask().isMapTask()) {
+ mapLauncher.addToTaskQueue(action);
+ } else {
+ reduceLauncher.addToTaskQueue(action);
+ }
+ }
+
+ private class TaskLauncher extends Thread {
+ private IntWritable numFreeSlots;
+ private final int maxSlots;
+ private List<TaskInProgress> tasksToLaunch;
+
+ public TaskLauncher(int numSlots) {
+ this.maxSlots = numSlots;
+ this.numFreeSlots = new IntWritable(numSlots);
+ this.tasksToLaunch = new LinkedList<TaskInProgress>();
+ setDaemon(true);
+ setName("TaskLauncher for task");
+ }
+
+ public void addToTaskQueue(LaunchTaskAction action) {
+ synchronized (tasksToLaunch) {
+ TaskInProgress tip = registerTask(action);
+ tasksToLaunch.add(tip);
+ tasksToLaunch.notifyAll();
+ }
+ }
+
+ public void cleanTaskQueue() {
+ tasksToLaunch.clear();
+ }
+
+ public void addFreeSlot() {
+ synchronized (numFreeSlots) {
+ numFreeSlots.set(numFreeSlots.get() + 1);
+ assert (numFreeSlots.get() <= maxSlots);
+ LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
+ numFreeSlots.notifyAll();
+ }
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ TaskInProgress tip;
+ synchronized (tasksToLaunch) {
+ while (tasksToLaunch.isEmpty()) {
+ tasksToLaunch.wait();
+ }
+ //get the TIP
+ tip = tasksToLaunch.remove(0);
+ LOG.info("Trying to launch : " + tip.getTask().getTaskID());
+ }
+ //wait for a slot to run
+ synchronized (numFreeSlots) {
+ while (numFreeSlots.get() == 0) {
+ numFreeSlots.wait();
+ }
+ LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
+ " and trying to launch "+tip.getTask().getTaskID());
+ numFreeSlots.set(numFreeSlots.get() - 1);
+ assert (numFreeSlots.get() >= 0);
+ }
+
+ //got a free slot. launch the task
+ startNewTask(tip);
+ } catch (InterruptedException e) {
+ return; // ALL DONE
+ } catch (Throwable th) {
+ LOG.error("TaskLauncher error " +
+ StringUtils.stringifyException(th));
+ }
+ }
+ }
+ }
+ private TaskInProgress registerTask(LaunchTaskAction action) {
Task t = action.getTask();
- LOG.info("LaunchTaskAction: " + t.getTaskID());
+ LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
TaskInProgress tip = new TaskInProgress(t, this.fConf);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
@@ -1517,10 +1627,19 @@
reduceTotal++;
}
}
+ return tip;
+ }
+ /**
+ * Start a new task.
+ * All exceptions are handled locally, so that we don't mess up the
+ * task tracker.
+ */
+ private void startNewTask(TaskInProgress tip) {
try {
localizeJob(tip);
if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.addTask(t.getTaskID(), getMemoryForTask(tip));
+ taskMemoryManager.addTask(tip.getTask().getTaskID(),
+ getMemoryForTask(tip.getJobConf()));
}
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
@@ -1691,13 +1810,11 @@
}
localJobConf.set("hadoop.net.static.resolutions", str.toString());
}
- OutputStream out = localFs.create(localTaskFile);
- try {
- localJobConf.write(out);
- } finally {
- out.close();
+ if (task.isMapTask()) {
+ debugCommand = localJobConf.getMapDebugScript();
+ } else {
+ debugCommand = localJobConf.getReduceDebugScript();
}
- task.setConf(localJobConf);
String keepPattern = localJobConf.getKeepTaskFilesPattern();
if (keepPattern != null) {
alwaysKeepTaskFiles =
@@ -1705,11 +1822,21 @@
} else {
alwaysKeepTaskFiles = false;
}
- if (task.isMapTask()) {
- debugCommand = localJobConf.getMapDebugScript();
- } else {
- debugCommand = localJobConf.getReduceDebugScript();
+ if (debugCommand != null || localJobConf.getProfileEnabled() ||
+ alwaysKeepTaskFiles) {
+ //disable jvm reuse
+ localJobConf.setNumTasksToExecutePerJvm(1);
}
+ if (isTaskMemoryManagerEnabled()) {
+ localJobConf.setBoolean("task.memory.mgmt.enabled", true);
+ }
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ localJobConf.write(out);
+ } finally {
+ out.close();
+ }
+ task.setConf(localJobConf);
}
/**
@@ -1717,6 +1844,10 @@
public Task getTask() {
return task;
}
+
+ public TaskRunner getTaskRunner() {
+ return runner;
+ }
public synchronized void setJobConf(JobConf lconf){
this.localJobConf = lconf;
@@ -1745,10 +1876,8 @@
*/
public synchronized void launchTask() throws IOException {
localizeTask(task);
- this.taskStatus.setRunState(TaskStatus.State.RUNNING);
this.runner = task.createRunner(TaskTracker.this);
this.runner.start();
- this.taskStatus.setStartTime(System.currentTimeMillis());
}
/**
@@ -1816,7 +1945,8 @@
this.taskStatus.setProgress(1.0f);
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
-
+ jvmManager.taskFinished(runner);
+ runner.signalDone();
LOG.info("Task " + task.getTaskID() + " is done.");
LOG.info("reported output size for " + task.getTaskID() + " was " + taskStatus.getOutputSize());
@@ -1857,13 +1987,16 @@
String jobConf = task.getJobFile();
try {
// get task's stdout file
- taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+ taskStdout = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
(task.getTaskID(), TaskLog.LogName.STDOUT));
// get task's stderr file
- taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+ taskStderr = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
(task.getTaskID(), TaskLog.LogName.STDERR));
// get task's syslog file
- taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+ taskSyslog = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
(task.getTaskID(), TaskLog.LogName.SYSLOG));
} catch(IOException e){
LOG.warn("Exception finding task's stdout/err/syslog files");
@@ -1882,8 +2015,8 @@
StringUtils.stringifyException(e));
}
// Build the command
- File stdout = TaskLog.getTaskLogFile(task.getTaskID(),
- TaskLog.LogName.DEBUGOUT);
+ File stdout = TaskLog.getRealTaskLogFileLocation(
+ task.getTaskID(), TaskLog.LogName.DEBUGOUT);
// add pipes program as argument if it exists.
String program ="";
String executable = Submitter.getExecutable(localJobConf);
@@ -1951,6 +2084,13 @@
}
}
+
+ synchronized void taskInitialized() {
+ if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ //one-way state change to INITIALIZED
+ this.taskStatus.setRunState(TaskStatus.State.INITIALIZED);
+ }
+ }
/**
@@ -2042,6 +2182,8 @@
// Kill the task if it is still running
synchronized(this){
if (getRunState() == TaskStatus.State.RUNNING ||
+ getRunState() == TaskStatus.State.UNASSIGNED ||
+ getRunState() == TaskStatus.State.INITIALIZED ||
getRunState() == TaskStatus.State.COMMIT_PENDING) {
kill(wasFailure);
}
@@ -2057,12 +2199,12 @@
*/
public synchronized void kill(boolean wasFailure) throws IOException {
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
+ taskStatus.getRunState() == TaskStatus.State.INITIALIZED ||
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
wasKilled = true;
if (wasFailure) {
failures += 1;
}
- runner.kill();
taskStatus.setRunState((wasFailure) ?
TaskStatus.State.FAILED :
TaskStatus.State.KILLED);
@@ -2074,6 +2216,16 @@
taskStatus.setRunState(TaskStatus.State.KILLED);
}
}
+ if (runner != null) {
+ runner.kill();
+ runner.signalDone();
+ } else {
+ if (task.isMapTask()) {
+ addFreeMapSlot();
+ } else {
+ addFreeReduceSlot();
+ }
+ }
}
/**
@@ -2112,13 +2264,6 @@
TaskAttemptID taskId = task.getTaskID();
LOG.debug("Cleaning up " + taskId);
- // Remove the associated pid-file, if any
- if (TaskTracker.this.isTaskMemoryManagerEnabled()) {
- Path pidFilePath = taskMemoryManager.getPidFilePath(taskId);
- if (pidFilePath != null) {
- directoryCleanupThread.addToQueue(pidFilePath);
- }
- }
synchronized (TaskTracker.this) {
if (needCleanup) {
@@ -2138,13 +2283,28 @@
+ task.getJobID() + Path.SEPARATOR + taskId;
if (needCleanup) {
if (runner != null) {
+ //cleans up the output directory of the task (where map outputs
+ //and reduce inputs get stored)
runner.close();
}
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
- taskDir));
+ //We don't delete the workdir
+ //since some other task (running in the same JVM)
+ //might be using the dir. The JVM running the tasks would clean
+ //the workdir per a task in the task process itself.
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ taskDir));
+ }
+
+ else {
+ directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ taskDir+"/job.xml"));
+ }
} else {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
- taskDir + Path.SEPARATOR + MRConstants.WORKDIR));
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ taskDir+"/work"));
+ }
}
} catch (Throwable ie) {
LOG.info("Error cleaning up task runner: " +
@@ -2170,16 +2330,54 @@
// ///////////////////////////////////////////////////////////////
// TaskUmbilicalProtocol
/////////////////////////////////////////////////////////////////
+
/**
* Called upon startup by the child process, to fetch Task data.
*/
- public synchronized Task getTask(TaskAttemptID taskid) throws IOException {
- TaskInProgress tip = tasks.get(taskid);
- if (tip != null) {
- return tip.getTask();
- } else {
- return null;
+ public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId)
+ throws IOException {
+ LOG.debug("JVM with ID : " + jvmId + " asked for a task");
+ if (!jvmManager.isJvmKnown(jvmId)) {
+ LOG.info("Killing unknown JVM " + jvmId);
+ return new JvmTask(null, true);
+ }
+ RunningJob rjob = runningJobs.get(jvmId.getJobId());
+ if (rjob == null) { //kill the JVM since the job is dead
+ jvmManager.killJvm(jvmId);
+ return new JvmTask(null, true);
+ }
+ TaskInProgress t = runningTasks.get(firstTaskId);
+ //if we can give the JVM the task it is asking for, well and good;
+ //if not, we give it some other task from the same job (note that some
+ //other JVM might have run this task while this JVM was init'ing)
+ if (t == null || t.getStatus().getRunState() !=
+ TaskStatus.State.INITIALIZED) {
+ boolean isMap = jvmId.isMapJVM();
+ synchronized (rjob) {
+ for (TaskInProgress tip : runningTasks.values()) {
+ synchronized (tip) {
+ if (tip.getTask().getJobID().equals(jvmId.getJobId()) &&
+ tip.getRunState() == TaskStatus.State.INITIALIZED
+ && ((isMap && tip.getTask().isMapTask()) ||
+ (!isMap && !tip.getTask().isMapTask()))) {
+ t = tip;
+ }
+ }
+ }
+ }
}
+ //now the task could be null or we could have got a task that already
+ //ran earlier (the firstTaskId case)
+ if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) {
+ jvmManager.setRunningTaskForJvm(jvmId, null);
+ return new JvmTask(null, false);
+ }
+ t.getStatus().setRunState(TaskStatus.State.RUNNING);
+ t.getStatus().setStartTime(System.currentTimeMillis());
+ jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner());
+ LOG.info("JVM with ID: " + jvmId + " given task: " +
+ t.getTask().getTaskID().toString());
+ return new JvmTask(t.getTask(), false);
}
/**
@@ -2334,6 +2532,13 @@
taskMemoryManager.removeTask(taskid);
}
}
+
+ synchronized void taskInitialized(TaskAttemptID taskid) {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ tip.taskInitialized();
+ }
+ }
/**
* A completed map task's output has been lost.
@@ -2355,7 +2560,7 @@
private JobID jobid;
private Path jobFile;
// keep this for later use
- Set<TaskInProgress> tasks;
+ volatile Set<TaskInProgress> tasks;
boolean localized;
boolean keepJobFiles;
FetchStatus f;
@@ -2384,61 +2589,6 @@
}
}
- /**
- * The main() for child processes.
- */
- public static class Child {
-
- public static void main(String[] args) throws Throwable {
- //LogFactory.showTime(false);
- LOG.debug("Child starting");
-
- JobConf defaultConf = new JobConf();
- String host = args[0];
- int port = Integer.parseInt(args[1]);
- InetSocketAddress address = new InetSocketAddress(host, port);
- TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
- TaskUmbilicalProtocol umbilical =
- (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
- TaskUmbilicalProtocol.versionID,
- address,
- defaultConf);
-
- Task task = umbilical.getTask(taskid);
- JobConf job = new JobConf(task.getJobFile());
- TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
- task.setConf(job);
-
- defaultConf.addResource(new Path(task.getJobFile()));
-
- // Initiate Java VM metrics
- JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
-
- try {
- // use job-specified working directory
- FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
- task.run(job, umbilical); // run the task
- } catch (FSError e) {
- LOG.fatal("FSError from child", e);
- umbilical.fsError(taskid, e.getMessage());
- } catch (Throwable throwable) {
- LOG.warn("Error running child", throwable);
- // Report back any failures, for diagnostic purposes
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- throwable.printStackTrace(new PrintStream(baos));
- umbilical.reportDiagnosticInfo(taskid, baos.toString());
- } finally {
- RPC.stopProxy(umbilical);
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- metricsContext.close();
- // Shutting down log4j of the child-vm...
- // This assumes that on return from Task.run()
- // there is no more logging done.
- LogManager.shutdown();
- }
- }
- }
-
/**
* Get the name for this task tracker.
* @return the string like "tracker_mymachine:50010"
@@ -2804,9 +2954,13 @@
* Is the TaskMemoryManager Enabled on this system?
* @return true if enabled, false otherwise.
*/
- boolean isTaskMemoryManagerEnabled() {
+ public boolean isTaskMemoryManagerEnabled() {
return taskMemoryManagerEnabled;
}
+
+ public TaskMemoryManagerThread getTaskMemoryManager() {
+ return taskMemoryManager;
+ }
private void setTaskMemoryManagerEnabledFlag() {
if (!ProcfsBasedProcessTree.isAvailable()) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Sep 19 00:31:41 2008
@@ -208,7 +208,7 @@
if (ts.getIsMap() &&
((state == TaskStatus.State.RUNNING) ||
(state == TaskStatus.State.UNASSIGNED) ||
- (state == TaskStatus.State.COMMIT_PENDING))) {
+ (state == TaskStatus.State.INITIALIZED))) {
mapCount++;
}
}
@@ -226,7 +226,7 @@
if ((!ts.getIsMap()) &&
((state == TaskStatus.State.RUNNING) ||
(state == TaskStatus.State.UNASSIGNED) ||
- (state == TaskStatus.State.COMMIT_PENDING))) {
+ (state == TaskStatus.State.INITIALIZED))) {
reduceCount++;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Sep 19 00:31:41 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.JvmTask;
/** Protocol that task child process uses to contact its parent process. The
* parent is a daemon which which polls the central master for a new map or
@@ -49,12 +50,19 @@
* Version 12 getMapCompletionEvents() now also indicates if the events are
* stale or not. Hence the return type is a class that
* encapsulates the events and whether to reset events index.
+ * Version 13 changed the getTask method signature for HADOOP-249
* */
- public static final long versionID = 11L;
+ public static final long versionID = 13L;
- /** Called when a child task process starts, to get its task.*/
- Task getTask(TaskAttemptID taskid) throws IOException;
+ /**
+ * Called when a child task process starts, to get its task.
+ * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
+ * @param taskid the first taskid that the JVM runs
+ * @return Task object
+ * @throws IOException
+ */
+ JvmTask getTask(JVMId jvmId, TaskAttemptID taskid) throws IOException;
/**
* Report child's progress to parent.
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Fri Sep 19 00:31:41 2008
@@ -72,14 +72,12 @@
// Run Sort-Validator
assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
}
-
+ Configuration conf = new Configuration();
public void testMapReduceSort() throws Exception {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
- Configuration conf = new Configuration();
-
// set io.sort.mb and fsinmemory.size.mb to lower value in test
conf.setInt("io.sort.mb", 5);
conf.setInt("fs.inmemory.size.mb", 20);
@@ -103,5 +101,8 @@
}
}
}
-
+ public void testMapReduceSortWithJvmReuse() throws Exception {
+ conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
+ testMapReduceSort();
+ }
}