You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/05/25 04:23:04 UTC
svn commit: r1127361 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Author: cdouglas
Date: Wed May 25 02:23:03 2011
New Revision: 1127361
URL: http://svn.apache.org/viewvc?rev=1127361&view=rev
Log:
MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
thread dies. Contributed by Robert Joseph Evans
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1127361&r1=1127360&r2=1127361&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed May 25 02:23:03 2011
@@ -226,6 +226,10 @@ Trunk (unreleased changes)
MAPREDUCE-2490. Add logging to graylist and blacklist activity to aid
diagnosis of related issues. (Jonathan Eagles via cdouglas)
+ MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
+ thread dies. (Robert Joseph Evans via cdouglas)
+
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1127361&r1=1127360&r2=1127361&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Wed May 25 02:23:03 2011
@@ -81,8 +81,8 @@ public class TrackerDistributedCacheMana
private MRAsyncDiskService asyncDiskService;
- BaseDirManager baseDirManager = new BaseDirManager();
- CleanupThread cleanupThread;
+ protected BaseDirManager baseDirManager = new BaseDirManager();
+ protected CleanupThread cleanupThread;
public TrackerDistributedCacheManager(Configuration conf,
TaskController taskController) throws IOException {
@@ -658,7 +658,7 @@ public class TrackerDistributedCacheMana
/**
* A thread to check and cleanup the unused files periodically
*/
- private class CleanupThread extends Thread {
+ protected class CleanupThread extends Thread {
// How often do we check if we need to clean up cache files?
private long cleanUpCheckPeriod = 60000L; // 1 minute
public CleanupThread(Configuration conf) {
@@ -666,6 +666,7 @@ public class TrackerDistributedCacheMana
conf.getLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD,
cleanUpCheckPeriod);
}
+
private volatile boolean running = true;
public void stopRunning() {
running = false;
@@ -676,19 +677,33 @@ public class TrackerDistributedCacheMana
try {
Thread.sleep(cleanUpCheckPeriod);
baseDirManager.checkAndCleanup();
- } catch (Exception e) {
+ } catch (IOException e) {
LOG.error("Exception in DistributedCache CleanupThread.", e);
- // This thread should keep running and never crash.
+ } catch (InterruptedException e) {
+ LOG.info("Cleanup...",e);
+ //To force us to exit cleanly
+ running = false;
+ } catch (Throwable t) {
+ exitTaskTracker(t);
}
}
}
+
+ /**
+ * Exit the task tracker because of a fatal error.
+ */
+ protected void exitTaskTracker(Throwable t) {
+ LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
+ " Exiting the TaskTracker", t);
+ Runtime.getRuntime().exit(-1);
+ }
}
/**
* This class holds properties of each base directories and is responsible
* for clean up unused cache files in base directories.
*/
- private class BaseDirManager {
+ protected class BaseDirManager {
private class CacheDir {
long size;
long subdirs;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1127361&r1=1127360&r2=1127361&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Wed May 25 02:23:03 2011
@@ -26,6 +26,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -479,6 +480,61 @@ public class TestTrackerDistributedCache
assertEquals(group, fs.getFileStatus(p).getGroup());
}
}
+
+ public static class MyTrackerDistributedCacheManager
+ extends TrackerDistributedCacheManager {
+
+ public Throwable caught = null;
+ public CountDownLatch done = new CountDownLatch(1);
+
+ public MyTrackerDistributedCacheManager(Configuration conf,
+ TaskController controller) throws IOException {
+ super(conf, controller);
+ this.baseDirManager = new TrackerDistributedCacheManager.BaseDirManager() {
+
+ @Override
+ public void checkAndCleanup() throws IOException {
+ throw new RuntimeException("This is a test!!!!");
+ }
+ };
+
+ this.cleanupThread = new TestCleanupThread(conf);
+ }
+
+ class TestCleanupThread extends TrackerDistributedCacheManager.CleanupThread {
+
+ public TestCleanupThread(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected void exitTaskTracker(Throwable t) {
+ caught = t;
+ this.stopRunning();
+ done.countDown();
+ }
+ }
+ }
+
+ public void testRuntimeExceptionInCleanup() throws Exception {
+ if(!canRun()) {
+ return;
+ }
+
+ Configuration conf2 = new Configuration(conf);
+ conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+ conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 0); // 0 ms (Don't sleep)
+
+ refreshConf(conf2);
+ MyTrackerDistributedCacheManager manager =
+ new MyTrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+
+ assertTrue(manager.done.await(200l, TimeUnit.MILLISECONDS));
+ assertNotNull(manager.caught);
+ assertTrue(manager.caught instanceof RuntimeException);
+ }
protected String getJobOwnerName() throws IOException {
return UserGroupInformation.getCurrentUser().getUserName();