You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/11/11 05:50:10 UTC
svn commit: r712941 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JvmManager.java
src/mapred/org/apache/hadoop/mapred/TaskRunner.java
src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Author: ddas
Date: Mon Nov 10 20:50:09 2008
New Revision: 712941
URL: http://svn.apache.org/viewvc?rev=712941&view=rev
Log:
Merge -r 712939:712940 from trunk onto 0.19 branch. Fixes HADOOP-4595.
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=712941&r1=712940&r2=712941&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Mon Nov 10 20:50:09 2008
@@ -968,6 +968,9 @@
HADOOP-4282. Some user facing URLs are not filtered by user filters.
(szetszwo)
+ HADOOP-4595. Fixes two race conditions - one to do with updating free slot count,
+ and another to do with starting the MapEventsFetcher thread. (ddas)
+
Release 0.18.3 - Unreleased
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=712941&r1=712940&r2=712941&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JvmManager.java Mon Nov 10 20:50:09 2008
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -177,7 +178,14 @@
}
synchronized public void stop() {
- for (JvmRunner jvm : jvmIdToRunner.values()) {
+ //since the kill() method invoked later on would remove
+ //an entry from the jvmIdToRunner map, we create a
+ //copy of the values and iterate over it (if we don't
+ //make a copy, we will encounter concurrentModification
+ //exception
+ List <JvmRunner> list = new ArrayList<JvmRunner>();
+ list.addAll(jvmIdToRunner.values());
+ for (JvmRunner jvm : list) {
jvm.kill();
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=712941&r1=712940&r2=712941&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Mon Nov 10 20:50:09 2008
@@ -459,11 +459,6 @@
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
tracker.reportTaskFinished(t.getTaskID(), false);
- if (t.isMapTask()) {
- tracker.addFreeMapSlot();
- } else {
- tracker.addFreeReduceSlot();
- }
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=712941&r1=712940&r2=712941&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Nov 10 20:50:09 2008
@@ -461,7 +461,7 @@
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackAddr, this.fConf);
-
+ this.running = true;
// start the thread that will fetch map task completion events
this.mapEventsFetcher = new MapEventsFetcherThread();
mapEventsFetcher.setDaemon(true);
@@ -484,7 +484,6 @@
reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
mapLauncher.start();
reduceLauncher.start();
- this.running = true;
}
public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -781,20 +780,8 @@
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
synchronized (tip) {
- try {
- tip.setJobConf(jobConf);
- tip.launchTask();
- } catch (Throwable ie) {
- tip.taskStatus.setRunState(TaskStatus.State.FAILED);
- try {
- tip.cleanup(true);
- } catch (Throwable ie2) {
- // Ignore it, we are just trying to cleanup.
- }
- String error = StringUtils.stringifyException(ie);
- tip.reportDiagnosticInfo(error);
- LOG.info(error);
- }
+ tip.setJobConf(jobConf);
+ tip.launchTask();
}
}
@@ -851,8 +838,6 @@
this.mapEventsFetcher.interrupt();
//stop the launchers
- mapLauncher.cleanTaskQueue();
- reduceLauncher.cleanTaskQueue();
this.mapLauncher.interrupt();
this.reduceLauncher.interrupt();
@@ -1539,14 +1524,6 @@
public JvmManager getJvmManagerInstance() {
return jvmManager;
}
-
- public void addFreeMapSlot() {
- mapLauncher.addFreeSlot();
- }
-
- public void addFreeReduceSlot() {
- reduceLauncher.addFreeSlot();
- }
private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
@@ -1571,7 +1548,7 @@
public void addToTaskQueue(LaunchTaskAction action) {
synchronized (tasksToLaunch) {
- TaskInProgress tip = registerTask(action);
+ TaskInProgress tip = registerTask(action, this);
tasksToLaunch.add(tip);
tasksToLaunch.notifyAll();
}
@@ -1632,10 +1609,11 @@
}
}
}
- private TaskInProgress registerTask(LaunchTaskAction action) {
+ private TaskInProgress registerTask(LaunchTaskAction action,
+ TaskLauncher launcher) {
Task t = action.getTask();
LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
- TaskInProgress tip = new TaskInProgress(t, this.fConf);
+ TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
@@ -1667,6 +1645,7 @@
tip.reportDiagnosticInfo(msg);
try {
tip.kill(true);
+ tip.cleanup(true);
} catch (IOException ie2) {
LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
StringUtils.stringifyException(ie2));
@@ -1750,11 +1729,17 @@
private long taskTimeout;
private String debugCommand;
private volatile boolean slotTaken = false;
+ private TaskLauncher launcher;
/**
*/
public TaskInProgress(Task task, JobConf conf) {
+ this(task, conf, null);
+ }
+
+ public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) {
this.task = task;
+ this.launcher = launcher;
this.lastProgressReport = System.currentTimeMillis();
this.defaultJobConf = conf;
localJobConf = null;
@@ -2234,13 +2219,16 @@
} else {
taskStatus.setRunState(TaskStatus.State.KILLED);
}
- if (slotTaken) {
- if (task.isMapTask()) {
- addFreeMapSlot();
- } else {
- addFreeReduceSlot();
- }
+ }
+ releaseSlot();
+ }
+
+ private synchronized void releaseSlot() {
+ if (slotTaken) {
+ if (launcher != null) {
+ launcher.addFreeSlot();
}
+ slotTaken = false;
}
}
@@ -2518,6 +2506,11 @@
if (tip != null) {
if (!commitPending) {
tip.taskFinished();
+ // Remove the entry from taskMemoryManagerThread's data structures.
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.removeTask(taskid);
+ }
+ tip.releaseSlot();
}
synchronized(finishedCount) {
finishedCount[0]++;
@@ -2526,10 +2519,6 @@
} else {
LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
}
- // Remove the entry from taskMemoryManagerThread's data structures.
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.removeTask(taskid);
- }
}