You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/18 02:16:27 UTC

svn commit: r1104636 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

Author: mahadev
Date: Wed May 18 00:16:27 2011
New Revision: 1104636

URL: http://svn.apache.org/viewvc?rev=1104636&view=rev
Log:
MAPREDUCE-2504. race in JobHistoryEventHandler stop (siddharth seth via mahadev)

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1104636&r1=1104635&r2=1104636&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed May 18 00:16:27 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+   
+    MAPREDUCE-2504. race in JobHistoryEventHandler stop (siddharth seth via mahadev)
 
     MAPREDUCE-2500. PB factories are not thread safe (siddharth seth via mahadev) 
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1104636&r1=1104635&r2=1104636&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Wed May 18 00:16:27 2011
@@ -72,6 +72,7 @@ public class JobHistoryEventHandler exte
     new LinkedBlockingQueue<JobHistoryEvent>();
   private Thread eventHandlingThread;
   private volatile boolean stopped;
+  private final Object lock = new Object();
 
   private static final Log LOG = LogFactory.getLog(
       JobHistoryEventHandler.class);
@@ -140,14 +141,24 @@ public class JobHistoryEventHandler exte
       @Override
       public void run() {
         JobHistoryEvent event = null;
-        while (!stopped || !Thread.currentThread().isInterrupted()) {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
           try {
             event = eventQueue.take();
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            LOG.info("EventQueue take interrupted. Returning");
             return;
           }
+          // If an event has been removed from the queue. Handle it.
+          // The rest of the queue is handled via stop()
+          // Clear the interrupt status if it's set before calling handleEvent
+          // and set it if it was set before calling handleEvent. 
+          // Interrupts received from other threads during handleEvent cannot be
+          // dealth with - Shell.runCommand() ignores them.
+          synchronized (lock) {
+            boolean isInterrupted = Thread.interrupted();
           handleEvent(event);
+            if (isInterrupted) Thread.currentThread().interrupt();
+          }
         }
       }
     });
@@ -160,7 +171,7 @@ public class JobHistoryEventHandler exte
     LOG.info("Stopping JobHistoryEventHandler");
     stopped = true;
     //do not interrupt while event handling is in progress
-    synchronized(this) {
+    synchronized(lock) {
       eventHandlingThread.interrupt();
     }
 
@@ -276,9 +287,10 @@ public class JobHistoryEventHandler exte
     }
   }
 
-  protected synchronized void handleEvent(JobHistoryEvent event) {
+  protected void handleEvent(JobHistoryEvent event) {
     // check for first event from a job
     //TODO Log a meta line with version information.
+    synchronized (lock) {
     if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
       try {
         setupEventWriter(event.getJobID());
@@ -309,6 +321,7 @@ public class JobHistoryEventHandler exte
       }
     }
   }
+  }
 
   protected void closeEventWriter(JobId jobId) throws IOException {
     final MetaInfo mi = fileMap.get(jobId);
@@ -368,7 +381,7 @@ public class JobHistoryEventHandler exte
     }
   }
 
-  private static class MetaInfo {
+  private class MetaInfo {
     private Path historyFile;
     private Path confFile;
     private EventWriter writer;
@@ -388,20 +401,24 @@ public class JobHistoryEventHandler exte
 
     JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
 
-    synchronized void closeWriter() throws IOException {
+    void closeWriter() throws IOException {
+      synchronized (lock) {
       if (writer != null) {
         writer.close();
       }
       writer = null;
     }
+    }
 
-    synchronized void writeEvent(HistoryEvent event) throws IOException {
+    void writeEvent(HistoryEvent event) throws IOException {
+      synchronized (lock) {
       if (writer != null) {
         writer.write(event);
         writer.flush();
       }
     }
   }
+  }
 
   private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
     //check if path exists, in case of retries it may not exist