You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/05/25 04:23:04 UTC

svn commit: r1127361 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Author: cdouglas
Date: Wed May 25 02:23:03 2011
New Revision: 1127361

URL: http://svn.apache.org/viewvc?rev=1127361&view=rev
Log:
MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
thread dies. Contributed by Robert Joseph Evans

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1127361&r1=1127360&r2=1127361&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed May 25 02:23:03 2011
@@ -226,6 +226,10 @@ Trunk (unreleased changes)
     MAPREDUCE-2490. Add logging to graylist and blacklist activity to aid
     diagnosis of related issues. (Jonathan Eagles via cdouglas)
 
+    MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
+    thread dies. (Robert Joseph Evans via cdouglas)
+
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1127361&r1=1127360&r2=1127361&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Wed May 25 02:23:03 2011
@@ -81,8 +81,8 @@ public class TrackerDistributedCacheMana
 
   private MRAsyncDiskService asyncDiskService;
 
-  BaseDirManager baseDirManager = new BaseDirManager();
-  CleanupThread cleanupThread;
+  protected BaseDirManager baseDirManager = new BaseDirManager();
+  protected CleanupThread cleanupThread;
 
   public TrackerDistributedCacheManager(Configuration conf,
       TaskController taskController) throws IOException {
@@ -658,7 +658,7 @@ public class TrackerDistributedCacheMana
   /**
    * A thread to check and cleanup the unused files periodically
    */
-  private class CleanupThread extends Thread {
+  protected class CleanupThread extends Thread {
     // How often do we check if we need to clean up cache files?
     private long cleanUpCheckPeriod = 60000L; // 1 minute
     public CleanupThread(Configuration conf) {
@@ -666,6 +666,7 @@ public class TrackerDistributedCacheMana
         conf.getLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD,
                             cleanUpCheckPeriod);
     }
+
     private volatile boolean running = true;
     public void stopRunning() {
       running = false;
@@ -676,19 +677,33 @@ public class TrackerDistributedCacheMana
         try {
           Thread.sleep(cleanUpCheckPeriod);
           baseDirManager.checkAndCleanup();
-        } catch (Exception e) {
+        } catch (IOException e) {
           LOG.error("Exception in DistributedCache CleanupThread.", e);
-          // This thread should keep running and never crash.
+        } catch (InterruptedException e) {
+          LOG.info("Cleanup...",e);
+          //To force us to exit cleanly
+          running = false;
+        } catch (Throwable t) {
+          exitTaskTracker(t);
         }
       }
     }
+
+    /**
+     * Exit the task tracker because of a fatal error.
+     */
+    protected void exitTaskTracker(Throwable t) {
+      LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
+          " Exiting the TaskTracker", t);
+      Runtime.getRuntime().exit(-1);
+    }
   }
 
   /**
    * This class holds properties of each base directories and is responsible
    * for clean up unused cache files in base directories.
    */
-  private class BaseDirManager {
+  protected class BaseDirManager {
     private class CacheDir {
       long size;
       long subdirs;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1127361&r1=1127360&r2=1127361&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Wed May 25 02:23:03 2011
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
@@ -479,6 +480,61 @@ public class TestTrackerDistributedCache
       assertEquals(group, fs.getFileStatus(p).getGroup());
     }
   }
+
+  public static class MyTrackerDistributedCacheManager 
+  extends TrackerDistributedCacheManager {
+
+    public Throwable caught = null;
+    public CountDownLatch done = new CountDownLatch(1);
+
+    public MyTrackerDistributedCacheManager(Configuration conf,
+        TaskController controller) throws IOException {
+      super(conf, controller);
+      this.baseDirManager = new TrackerDistributedCacheManager.BaseDirManager() {
+
+        @Override
+        public void checkAndCleanup() throws IOException {
+          throw new RuntimeException("This is a test!!!!");
+        }
+      };
+
+      this.cleanupThread = new TestCleanupThread(conf);
+    }
+
+    class TestCleanupThread extends TrackerDistributedCacheManager.CleanupThread {
+
+      public TestCleanupThread(Configuration conf) {
+        super(conf);
+      }
+
+      @Override
+      protected void exitTaskTracker(Throwable t) {
+        caught = t;
+        this.stopRunning();
+        done.countDown();
+      }        
+    }
+  }
+
+  public void testRuntimeExceptionInCleanup() throws Exception {
+    if(!canRun()) {
+      return;
+    }
+
+    Configuration conf2 = new Configuration(conf);
+    conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+    conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 0); // 0 ms (Don't sleep)
+
+    refreshConf(conf2);
+    MyTrackerDistributedCacheManager manager = 
+      new MyTrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+
+    assertTrue(manager.done.await(200l, TimeUnit.MILLISECONDS));
+    assertNotNull(manager.caught);
+    assertTrue(manager.caught instanceof RuntimeException);
+  }
   
   protected String getJobOwnerName() throws IOException {
     return UserGroupInformation.getCurrentUser().getUserName();