You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2009/12/17 03:50:23 UTC

svn commit: r891524 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/util/ src/test/mapred/org/apache/hadoop/mapreduce/util/

Author: dhruba
Date: Thu Dec 17 02:50:22 2009
New Revision: 891524

URL: http://svn.apache.org/viewvc?rev=891524&view=rev
Log:
MAPREDUCE-1213. TaskTrackers restart is faster because it deletes
distributed cache directory asynchronously. (Zheng Shao via dhruba)


Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Dec 17 02:50:22 2009
@@ -147,6 +147,9 @@
 
     MAPREDUCE-1294. Build fails to pull latest hadoop-core-* artifacts (cos)
 
+    MAPREDUCE-1213. TaskTrackers restart is faster because it deletes
+    distributed cache directory asynchronously. (Zheng Shao via dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Dec 17 02:50:22 2009
@@ -100,6 +100,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 
 /*******************************************************
@@ -168,6 +169,8 @@
 
   private final JobHistory jobHistory;
 
+  private MRAsyncDiskService asyncDiskService;
+  
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -1511,7 +1514,8 @@
     }
     
     // Same with 'localDir' except it's always on the local disk.
-    jobConf.deleteLocalFiles(SUBDIR);
+    asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(conf), conf.getLocalDirs());
+    asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
 
     // Initialize history DONE folder
     jobHistory.initDone(conf, fs);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec 17 02:50:22 2009
@@ -99,6 +99,7 @@
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -265,6 +266,8 @@
 
   private IndexCache indexCache;
 
+  private MRAsyncDiskService asyncDiskService;
+  
   /**
   * Handle to the specific instance of the {@link TaskController} class
   */
@@ -540,9 +543,10 @@
        fConf.get(TT_DNS_NAMESERVER,"default"));
     }
  
-    //check local disk
+    //check local disk and start async disk service
     checkLocalDirs(this.fConf.getLocalDirs());
-    fConf.deleteLocalFiles(SUBDIR);
+    asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(fConf), fConf.getLocalDirs());
+    asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
 
     // Clear out state tables
     this.tasks.clear();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Thu Dec 17 02:50:22 2009
@@ -146,7 +146,7 @@
   public void setup(LocalDirAllocator lDirAlloc, File workDir, 
       String cacheSubdir) throws IOException {
     setupCalled = true;
-    
+      
     if (cacheFiles.isEmpty()) {
       return;
     }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=891524&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Thu Dec 17 02:50:22 2009
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.AsyncDiskService;
+import org.apache.hadoop.util.StringUtils;
+
+/*
+ * This class is a container of multiple thread pools, each for a volume,
+ * so that we can schedule async disk operations easily.
+ * 
+ * Examples of async disk operations are deletion of files.
+ * We can move the files to a "TO_BE_DELETED" folder before asychronously
+ * deleting it, to make sure the caller can run it faster.
+ * 
+ * This class also contains all operations that will be performed by the
+ * thread pools. 
+ */
+public class MRAsyncDiskService {
+  
+  public static final Log LOG = LogFactory.getLog(MRAsyncDiskService.class);
+  
+  AsyncDiskService asyncDiskService;
+  
+  /**
+   * Create a AsyncDiskServices with a set of volumes (specified by their
+   * root directories).
+   * 
+   * The AsyncDiskServices uses one ThreadPool per volume to do the async
+   * disk operations.
+   * 
+   * @param localFileSystem The localFileSystem used for deletions.
+   * @param volumes The roots of the file system volumes.
+   */
+  public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes) throws IOException {
+    
+    asyncDiskService = new AsyncDiskService(volumes);
+    
+    this.localFileSystem = localFileSystem;
+    this.volumes = volumes;
+    
+    // Create one ThreadPool per volume
+    for (int v = 0 ; v < volumes.length; v++) {
+      // Create the root for file deletion
+      if (!localFileSystem.mkdirs(new Path(volumes[v], SUBDIR))) {
+        throw new IOException("Cannot create " + SUBDIR + " in " + volumes[v]);
+      }
+    }
+    
+  }
+  
+  /**
+   * Execute the task sometime in the future, using ThreadPools.
+   */
+  synchronized void execute(String root, Runnable task) {
+    asyncDiskService.execute(root, task);
+  }
+  
+  /**
+   * Gracefully start the shut down of all ThreadPools.
+   */
+  synchronized void shutdown() {
+    asyncDiskService.shutdown();
+  }
+
+  /**
+   * Shut down all ThreadPools immediately.
+   */
+  public synchronized List<Runnable> shutdownNow() {
+    return asyncDiskService.shutdownNow();
+  }
+  
+  /**
+   * Wait for the termination of the thread pools.
+   * 
+   * @param milliseconds  The number of milliseconds to wait
+   * @return   true if all thread pools are terminated without time limit
+   * @throws InterruptedException 
+   */
+  public synchronized boolean awaitTermination(long milliseconds) 
+      throws InterruptedException {
+    return asyncDiskService.awaitTermination(milliseconds);
+  }
+  
+  public static final String SUBDIR = "toBeDeleted";
+  
+  private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss.SSS");
+  
+  private FileSystem localFileSystem;
+  
+  private String[] volumes; 
+                 
+  private int uniqueId = 0;
+  
+  /** A task for deleting a pathName from a volume.
+   */
+  class DeleteTask implements Runnable {
+
+    /** The volume that the file is on*/
+    String volume;
+    /** The file name before the move */
+    String originalPath;
+    /** The file name after the move */
+    String pathToBeDeleted;
+    
+    /**
+     * Delete a file/directory (recursively if needed).
+     * @param volume        The volume that the file/dir is in.
+     * @param originalPath  The original name, relative to volume root.
+     * @param pathToBeDeleted  The name after the move, relative to volume root,
+     *                         containing SUBDIR.
+     */
+    DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
+      this.volume = volume;
+      this.originalPath = originalPath;
+      this.pathToBeDeleted = pathToBeDeleted;
+    }
+    
+    @Override
+    public String toString() {
+      // Called in AsyncDiskService.execute for displaying error messages.
+      return "deletion of " + pathToBeDeleted + " on " + volume
+          + " with original name " + originalPath;
+    }
+
+    @Override
+    public void run() {
+      boolean success = false;
+      Exception e = null;
+      try {
+        Path absolutePathToBeDeleted = new Path(volume, pathToBeDeleted);
+        success = localFileSystem.delete(absolutePathToBeDeleted, true);
+      } catch (Exception ex) {
+        e = ex;
+      }
+      
+      if (!success) {
+        if (e != null) {
+          LOG.warn("Failure in " + this + " with exception " + StringUtils.stringifyException(e));
+        } else {
+          LOG.warn("Failure in " + this);
+        }
+      } else {
+        LOG.debug("Successfully did " + this.toString());
+      }
+    }
+  };
+  
+  
+  /**
+   * Move the path name on one volume to a temporary location and then 
+   * delete them.
+   * 
+   * This functions returns when the moves are done, but not necessarily all
+   * deletions are done. This is usually good enough because applications 
+   * won't see the path name under the old name anyway after the move. 
+   * 
+   * @param volume       The disk volume
+   * @param pathName     The path name relative to volume.
+   * @throws IOException If the move failed 
+   */
+  public boolean moveAndDelete(String volume, String pathName) throws IOException {
+    // Move the file right now, so that it can be deleted later
+    String newPathName;
+    synchronized (this) {
+      newPathName = format.format(new Date()) + "_" + uniqueId;
+      uniqueId ++;
+    }
+    newPathName = SUBDIR + Path.SEPARATOR_CHAR + newPathName;
+    
+    Path source = new Path(volume, pathName);
+    Path target = new Path(volume, newPathName); 
+    try {
+      if (!localFileSystem.rename(source, target)) {
+        return false;
+      }
+    } catch (FileNotFoundException e) {
+      // Return false in case that the file is not found.  
+      return false;
+    }
+
+    DeleteTask task = new DeleteTask(volume, pathName, newPathName);
+    execute(volume, task);
+    return true;
+  }
+
+  /**
+   * Move the path name on each volume to a temporary location and then 
+   * delete them.
+   * 
+   * This functions returns when the moves are done, but not necessarily all
+   * deletions are done. This is usually good enough because applications 
+   * won't see the path name under the old name anyway after the move. 
+   * 
+   * @param pathName     The path name on each volume.
+   * @throws IOException If the move failed 
+   */
+  public void moveAndDeleteFromEachVolume(String pathName) throws IOException {
+    for (int i = 0; i < volumes.length; i++) {
+      moveAndDelete(volumes[i], pathName);
+    }
+  }
+  
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java?rev=891524&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java Thu Dec 17 02:50:22 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.util;
+
+import java.io.File;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+/**
+ * A test for MRAsyncDiskService.
+ */
+public class TestMRAsyncDiskService extends TestCase {
+  
+  private static String TEST_ROOT_DIR = new Path(System.getProperty(
+      "test.build.data", "/tmp")).toString();
+
+  /**
+   * This test creates one empty directory, and one directory with content, 
+   * and then removes them through MRAsyncDiskService. 
+   * @throws Throwable
+   */
+  public void testMRAsyncDiskService() throws Throwable {
+  
+    FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
+    String[] vols = new String[]{TEST_ROOT_DIR + "/0",
+        TEST_ROOT_DIR + "/1"};
+    MRAsyncDiskService service = new MRAsyncDiskService(
+        localFileSystem, vols);
+    
+    String a = "a";
+    String b = "b";
+    String c = "b/c";
+    
+    File fa = new File(vols[0], a);
+    File fb = new File(vols[1], b);
+    File fc = new File(vols[1], c);
+    
+    // Create the directories
+    fa.mkdirs();
+    fb.mkdirs();
+    fc.mkdirs();
+    
+    assertTrue(fa.exists());
+    assertTrue(fb.exists());
+    assertTrue(fc.exists());
+    
+    // Move and delete them
+    service.moveAndDelete(vols[0], a);
+    assertFalse(fa.exists());
+    service.moveAndDelete(vols[1], b);
+    assertFalse(fb.exists());
+    assertFalse(fc.exists());
+    
+    // Sleep at most 5 seconds to make sure the deleted items are all gone.
+    service.shutdown();
+    if (!service.awaitTermination(5000)) {
+      fail("MRAsyncDiskService is still not shutdown in 5 seconds!");
+    }
+    
+    // All contents should be gone by now.
+    for (int i = 0; i < 2; i++) {
+      File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.SUBDIR);
+      String[] content = toBeDeletedDir.list();
+      assertNotNull("Cannot find " + toBeDeletedDir, content);
+      assertEquals("" + toBeDeletedDir + " should be empty now.", 
+          0, content.length);
+    }
+  }
+}