You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2009/12/17 03:50:23 UTC
svn commit: r891524 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/filecache/
src/java/org/apache/hadoop/mapreduce/util/
src/test/mapred/org/apache/hadoop/mapreduce/util/
Author: dhruba
Date: Thu Dec 17 02:50:22 2009
New Revision: 891524
URL: http://svn.apache.org/viewvc?rev=891524&view=rev
Log:
MAPREDUCE-1213. TaskTrackers restart is faster because it deletes
distributed cache directory asynchronously. (Zheng Shao via dhruba)
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Dec 17 02:50:22 2009
@@ -147,6 +147,9 @@
MAPREDUCE-1294. Build fails to pull latest hadoop-core-* artifacts (cos)
+ MAPREDUCE-1213. TaskTrackers restart is faster because it deletes
+ distributed cache directory asynchronously. (Zheng Shao via dhruba)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Dec 17 02:50:22 2009
@@ -100,6 +100,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
/*******************************************************
@@ -168,6 +169,8 @@
private final JobHistory jobHistory;
+ private MRAsyncDiskService asyncDiskService;
+
/**
* A client tried to submit a job before the Job Tracker was ready.
*/
@@ -1511,7 +1514,8 @@
}
// Same with 'localDir' except it's always on the local disk.
- jobConf.deleteLocalFiles(SUBDIR);
+ asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(conf), conf.getLocalDirs());
+ asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
// Initialize history DONE folder
jobHistory.initDone(conf, fs);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec 17 02:50:22 2009
@@ -99,6 +99,7 @@
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
/*******************************************************
* TaskTracker is a process that starts and tracks MR Tasks
@@ -265,6 +266,8 @@
private IndexCache indexCache;
+ private MRAsyncDiskService asyncDiskService;
+
/**
* Handle to the specific instance of the {@link TaskController} class
*/
@@ -540,9 +543,10 @@
fConf.get(TT_DNS_NAMESERVER,"default"));
}
- //check local disk
+ //check local disk and start async disk service
checkLocalDirs(this.fConf.getLocalDirs());
- fConf.deleteLocalFiles(SUBDIR);
+ asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(fConf), fConf.getLocalDirs());
+ asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
// Clear out state tables
this.tasks.clear();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=891524&r1=891523&r2=891524&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Thu Dec 17 02:50:22 2009
@@ -146,7 +146,7 @@
public void setup(LocalDirAllocator lDirAlloc, File workDir,
String cacheSubdir) throws IOException {
setupCalled = true;
-
+
if (cacheFiles.isEmpty()) {
return;
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=891524&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Thu Dec 17 02:50:22 2009
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.AsyncDiskService;
+import org.apache.hadoop.util.StringUtils;
+
+/*
+ * This class is a container of multiple thread pools, each for a volume,
+ * so that we can schedule async disk operations easily.
+ *
+ * Examples of async disk operations are deletion of files.
+ * We can move the files to a "TO_BE_DELETED" folder before asychronously
+ * deleting it, to make sure the caller can run it faster.
+ *
+ * This class also contains all operations that will be performed by the
+ * thread pools.
+ */
+public class MRAsyncDiskService {
+
+ public static final Log LOG = LogFactory.getLog(MRAsyncDiskService.class);
+
+ AsyncDiskService asyncDiskService;
+
+ /**
+ * Create a AsyncDiskServices with a set of volumes (specified by their
+ * root directories).
+ *
+ * The AsyncDiskServices uses one ThreadPool per volume to do the async
+ * disk operations.
+ *
+ * @param localFileSystem The localFileSystem used for deletions.
+ * @param volumes The roots of the file system volumes.
+ */
+ public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes) throws IOException {
+
+ asyncDiskService = new AsyncDiskService(volumes);
+
+ this.localFileSystem = localFileSystem;
+ this.volumes = volumes;
+
+ // Create one ThreadPool per volume
+ for (int v = 0 ; v < volumes.length; v++) {
+ // Create the root for file deletion
+ if (!localFileSystem.mkdirs(new Path(volumes[v], SUBDIR))) {
+ throw new IOException("Cannot create " + SUBDIR + " in " + volumes[v]);
+ }
+ }
+
+ }
+
+ /**
+ * Execute the task sometime in the future, using ThreadPools.
+ */
+ synchronized void execute(String root, Runnable task) {
+ asyncDiskService.execute(root, task);
+ }
+
+ /**
+ * Gracefully start the shut down of all ThreadPools.
+ */
+ synchronized void shutdown() {
+ asyncDiskService.shutdown();
+ }
+
+ /**
+ * Shut down all ThreadPools immediately.
+ */
+ public synchronized List<Runnable> shutdownNow() {
+ return asyncDiskService.shutdownNow();
+ }
+
+ /**
+ * Wait for the termination of the thread pools.
+ *
+ * @param milliseconds The number of milliseconds to wait
+ * @return true if all thread pools are terminated without time limit
+ * @throws InterruptedException
+ */
+ public synchronized boolean awaitTermination(long milliseconds)
+ throws InterruptedException {
+ return asyncDiskService.awaitTermination(milliseconds);
+ }
+
+ public static final String SUBDIR = "toBeDeleted";
+
+ private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss.SSS");
+
+ private FileSystem localFileSystem;
+
+ private String[] volumes;
+
+ private int uniqueId = 0;
+
+ /** A task for deleting a pathName from a volume.
+ */
+ class DeleteTask implements Runnable {
+
+ /** The volume that the file is on*/
+ String volume;
+ /** The file name before the move */
+ String originalPath;
+ /** The file name after the move */
+ String pathToBeDeleted;
+
+ /**
+ * Delete a file/directory (recursively if needed).
+ * @param volume The volume that the file/dir is in.
+ * @param originalPath The original name, relative to volume root.
+ * @param pathToBeDeleted The name after the move, relative to volume root,
+ * containing SUBDIR.
+ */
+ DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
+ this.volume = volume;
+ this.originalPath = originalPath;
+ this.pathToBeDeleted = pathToBeDeleted;
+ }
+
+ @Override
+ public String toString() {
+ // Called in AsyncDiskService.execute for displaying error messages.
+ return "deletion of " + pathToBeDeleted + " on " + volume
+ + " with original name " + originalPath;
+ }
+
+ @Override
+ public void run() {
+ boolean success = false;
+ Exception e = null;
+ try {
+ Path absolutePathToBeDeleted = new Path(volume, pathToBeDeleted);
+ success = localFileSystem.delete(absolutePathToBeDeleted, true);
+ } catch (Exception ex) {
+ e = ex;
+ }
+
+ if (!success) {
+ if (e != null) {
+ LOG.warn("Failure in " + this + " with exception " + StringUtils.stringifyException(e));
+ } else {
+ LOG.warn("Failure in " + this);
+ }
+ } else {
+ LOG.debug("Successfully did " + this.toString());
+ }
+ }
+ };
+
+
+ /**
+ * Move the path name on one volume to a temporary location and then
+ * delete them.
+ *
+ * This functions returns when the moves are done, but not necessarily all
+ * deletions are done. This is usually good enough because applications
+ * won't see the path name under the old name anyway after the move.
+ *
+ * @param volume The disk volume
+ * @param pathName The path name relative to volume.
+ * @throws IOException If the move failed
+ */
+ public boolean moveAndDelete(String volume, String pathName) throws IOException {
+ // Move the file right now, so that it can be deleted later
+ String newPathName;
+ synchronized (this) {
+ newPathName = format.format(new Date()) + "_" + uniqueId;
+ uniqueId ++;
+ }
+ newPathName = SUBDIR + Path.SEPARATOR_CHAR + newPathName;
+
+ Path source = new Path(volume, pathName);
+ Path target = new Path(volume, newPathName);
+ try {
+ if (!localFileSystem.rename(source, target)) {
+ return false;
+ }
+ } catch (FileNotFoundException e) {
+ // Return false in case that the file is not found.
+ return false;
+ }
+
+ DeleteTask task = new DeleteTask(volume, pathName, newPathName);
+ execute(volume, task);
+ return true;
+ }
+
+ /**
+ * Move the path name on each volume to a temporary location and then
+ * delete them.
+ *
+ * This functions returns when the moves are done, but not necessarily all
+ * deletions are done. This is usually good enough because applications
+ * won't see the path name under the old name anyway after the move.
+ *
+ * @param pathName The path name on each volume.
+ * @throws IOException If the move failed
+ */
+ public void moveAndDeleteFromEachVolume(String pathName) throws IOException {
+ for (int i = 0; i < volumes.length; i++) {
+ moveAndDelete(volumes[i], pathName);
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java?rev=891524&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java Thu Dec 17 02:50:22 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.util;
+
+import java.io.File;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+/**
+ * A test for MRAsyncDiskService.
+ */
+public class TestMRAsyncDiskService extends TestCase {
+
+ private static String TEST_ROOT_DIR = new Path(System.getProperty(
+ "test.build.data", "/tmp")).toString();
+
+ /**
+ * This test creates one empty directory, and one directory with content,
+ * and then removes them through MRAsyncDiskService.
+ * @throws Throwable
+ */
+ public void testMRAsyncDiskService() throws Throwable {
+
+ FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
+ String[] vols = new String[]{TEST_ROOT_DIR + "/0",
+ TEST_ROOT_DIR + "/1"};
+ MRAsyncDiskService service = new MRAsyncDiskService(
+ localFileSystem, vols);
+
+ String a = "a";
+ String b = "b";
+ String c = "b/c";
+
+ File fa = new File(vols[0], a);
+ File fb = new File(vols[1], b);
+ File fc = new File(vols[1], c);
+
+ // Create the directories
+ fa.mkdirs();
+ fb.mkdirs();
+ fc.mkdirs();
+
+ assertTrue(fa.exists());
+ assertTrue(fb.exists());
+ assertTrue(fc.exists());
+
+ // Move and delete them
+ service.moveAndDelete(vols[0], a);
+ assertFalse(fa.exists());
+ service.moveAndDelete(vols[1], b);
+ assertFalse(fb.exists());
+ assertFalse(fc.exists());
+
+ // Sleep at most 5 seconds to make sure the deleted items are all gone.
+ service.shutdown();
+ if (!service.awaitTermination(5000)) {
+ fail("MRAsyncDiskService is still not shutdown in 5 seconds!");
+ }
+
+ // All contents should be gone by now.
+ for (int i = 0; i < 2; i++) {
+ File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.SUBDIR);
+ String[] content = toBeDeletedDir.list();
+ assertNotNull("Cannot find " + toBeDeletedDir, content);
+ assertEquals("" + toBeDeletedDir + " should be empty now.",
+ 0, content.length);
+ }
+ }
+}