You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/18 02:16:27 UTC
svn commit: r1104636 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
Author: mahadev
Date: Wed May 18 00:16:27 2011
New Revision: 1104636
URL: http://svn.apache.org/viewvc?rev=1104636&view=rev
Log:
MAPREDUCE-2504. race in JobHistoryEventHandler stop (siddharth seth via mahadev)
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1104636&r1=1104635&r2=1104636&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed May 18 00:16:27 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+
+ MAPREDUCE-2504. race in JobHistoryEventHandler stop (siddharth seth via mahadev)
MAPREDUCE-2500. PB factories are not thread safe (siddharth seth via mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1104636&r1=1104635&r2=1104636&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Wed May 18 00:16:27 2011
@@ -72,6 +72,7 @@ public class JobHistoryEventHandler exte
new LinkedBlockingQueue<JobHistoryEvent>();
private Thread eventHandlingThread;
private volatile boolean stopped;
+ private final Object lock = new Object();
private static final Log LOG = LogFactory.getLog(
JobHistoryEventHandler.class);
@@ -140,14 +141,24 @@ public class JobHistoryEventHandler exte
@Override
public void run() {
JobHistoryEvent event = null;
- while (!stopped || !Thread.currentThread().isInterrupted()) {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ LOG.info("EventQueue take interrupted. Returning");
return;
}
+ // If an event has been removed from the queue. Handle it.
+ // The rest of the queue is handled via stop()
+ // Clear the interrupt status if it's set before calling handleEvent
+ // and set it if it was set before calling handleEvent.
+ // Interrupts received from other threads during handleEvent cannot be
+ // dealth with - Shell.runCommand() ignores them.
+ synchronized (lock) {
+ boolean isInterrupted = Thread.interrupted();
handleEvent(event);
+ if (isInterrupted) Thread.currentThread().interrupt();
+ }
}
}
});
@@ -160,7 +171,7 @@ public class JobHistoryEventHandler exte
LOG.info("Stopping JobHistoryEventHandler");
stopped = true;
//do not interrupt while event handling is in progress
- synchronized(this) {
+ synchronized(lock) {
eventHandlingThread.interrupt();
}
@@ -276,9 +287,10 @@ public class JobHistoryEventHandler exte
}
}
- protected synchronized void handleEvent(JobHistoryEvent event) {
+ protected void handleEvent(JobHistoryEvent event) {
// check for first event from a job
//TODO Log a meta line with version information.
+ synchronized (lock) {
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
try {
setupEventWriter(event.getJobID());
@@ -309,6 +321,7 @@ public class JobHistoryEventHandler exte
}
}
}
+ }
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
@@ -368,7 +381,7 @@ public class JobHistoryEventHandler exte
}
}
- private static class MetaInfo {
+ private class MetaInfo {
private Path historyFile;
private Path confFile;
private EventWriter writer;
@@ -388,20 +401,24 @@ public class JobHistoryEventHandler exte
JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
- synchronized void closeWriter() throws IOException {
+ void closeWriter() throws IOException {
+ synchronized (lock) {
if (writer != null) {
writer.close();
}
writer = null;
}
+ }
- synchronized void writeEvent(HistoryEvent event) throws IOException {
+ void writeEvent(HistoryEvent event) throws IOException {
+ synchronized (lock) {
if (writer != null) {
writer.write(event);
writer.flush();
}
}
}
+ }
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
//check if path exists, in case of retries it may not exist