You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/08/25 12:27:54 UTC
svn commit: r807543 [1/2] - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/filecache/ src/test/
src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Author: tomwhite
Date: Tue Aug 25 10:27:53 2009
New Revision: 807543
URL: http://svn.apache.org/viewvc?rev=807543&view=rev
Log:
MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner). Contributed by Philip Zeyliger.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Removed:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
hadoop/mapreduce/trunk/src/test/commit-tests
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 25 10:27:53 2009
@@ -253,6 +253,9 @@
in Configuration for dumping in JSON format from Hudson trunk build #68.
(yhemanth)
+ MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner).
+ (Philip Zeyliger via tomwhite)
+
BUG FIXES
MAPREDUCE-878. Rename fair scheduler design doc to
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue Aug 25 10:27:53 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
@@ -27,7 +28,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.TaskType;
@@ -146,7 +146,7 @@
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
//the symlinks have to be rebuilt.
- TaskRunner.setupWorkDir(job);
+ TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
numTasksToExecute = job.getNumTasksToExecutePerJvm();
assert(numTasksToExecute != 0);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Aug 25 10:27:53 2009
@@ -48,6 +48,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -567,15 +568,12 @@
"Applications should implement Tool for the same.");
}
- // get all the command line arguments into the
- // jobconf passed in by the user conf
- String files = null;
- String libjars = null;
- String archives = null;
-
- files = job.get("tmpfiles");
- libjars = job.get("tmpjars");
- archives = job.get("tmparchives");
+ // Retrieve command line arguments placed into the JobConf
+ // by GenericOptionsParser.
+ String files = job.get("tmpfiles");
+ String libjars = job.get("tmpjars");
+ String archives = job.get("tmparchives");
+
/*
* set this user's id in job configuration, so later job files can be
* accessed using this user's id
@@ -651,27 +649,7 @@
}
// set the timestamps of the archives and files
- URI[] tarchives = DistributedCache.getCacheArchives(job);
- if (tarchives != null) {
- StringBuffer archiveTimestamps =
- new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
- for (int i = 1; i < tarchives.length; i++) {
- archiveTimestamps.append(",");
- archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
- }
- DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
- }
-
- URI[] tfiles = DistributedCache.getCacheFiles(job);
- if (tfiles != null) {
- StringBuffer fileTimestamps =
- new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
- for (int i = 1; i < tfiles.length; i++) {
- fileTimestamps.append(",");
- fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
- }
- DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
- }
+ TrackerDistributedCacheManager.determineTimestamps(job);
String originalJarPath = job.getJar();
@@ -700,6 +678,7 @@
}
+
private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
UnixUserGroupInformation ugi = null;
try {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Aug 25 10:27:53 2009
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -26,15 +28,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.JobTrackerMetricsInst;
-import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapred.JobClient.RawSplit;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.util.ReflectionUtils;
/** Implements MapReduce locally, in-process, for debugging. */
@@ -82,9 +86,16 @@
return rawSplits;
}
- private class Job extends Thread
- implements TaskUmbilicalProtocol {
- private Path file;
+ private class Job extends Thread implements TaskUmbilicalProtocol {
+ // The job directory on the system: JobClient places job configurations here.
+ // This is analogous to JobTracker's system directory.
+ private Path systemJobDir;
+ private Path systemJobFile;
+
+ // The job directory for the task. Analagous to a task's job directory.
+ private Path localJobDir;
+ private Path localJobFile;
+
private JobID id;
private JobConf job;
@@ -92,10 +103,12 @@
private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
private JobProfile profile;
- private Path localFile;
private FileSystem localFs;
boolean killed = false;
+ private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+ private TaskDistributedCacheManager taskDistributedCacheManager;
+
// Counters summed over all the map/reduce tasks which
// have successfully completed
private Counters completedTaskCounters = new Counters();
@@ -108,15 +121,55 @@
}
public Job(JobID jobid, JobConf conf) throws IOException {
- this.file = new Path(getSystemDir(), jobid + "/job.xml");
+ this.systemJobDir = new Path(getSystemDir(), jobid.toString());
+ this.systemJobFile = new Path(systemJobDir, "job.xml");
this.id = jobid;
-
- this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
this.localFs = FileSystem.getLocal(conf);
+ this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+ this.localJobFile = new Path(this.localJobDir, id + ".xml");
- fs.copyToLocalFile(file, localFile);
- this.job = new JobConf(localFile);
- profile = new JobProfile(job.getUser(), id, file.toString(),
+ // Manage the distributed cache. If there are files to be copied,
+ // this will trigger localFile to be re-written again.
+ this.trackerDistributerdCacheManager =
+ new TrackerDistributedCacheManager(conf);
+ this.taskDistributedCacheManager =
+ trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
+ taskDistributedCacheManager.setup(
+ new LocalDirAllocator("mapred.local.dir"),
+ new File(systemJobDir.toString()),
+ "archive");
+
+ if (DistributedCache.getSymlink(conf)) {
+ // This is not supported largely because,
+ // for a Child subprocess, the cwd in LocalJobRunner
+ // is not a fresh slate, but rather the user's working directory.
+ // This is further complicated because the logic in
+ // setupWorkDir only creates symlinks if there's a jarfile
+ // in the configuration.
+ LOG.warn("LocalJobRunner does not support " +
+ "symlinking into current working dir.");
+ }
+ // Setup the symlinks for the distributed cache.
+ TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
+
+ // Write out configuration file. Instead of copying it from
+ // systemJobFile, we re-write it, since setup(), above, may have
+ // updated it.
+ OutputStream out = localFs.create(localJobFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ this.job = new JobConf(localJobFile);
+
+ // Job (the current object) is a Thread, so we wrap its class loader.
+ if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
+ setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
+ getContextClassLoader()));
+ }
+
+ profile = new JobProfile(job.getUser(), id, systemJobFile.toString(),
"http://localhost:8080/", job.getJobName());
status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING,
profile.getUser(), profile.getJobName(), profile.getJobFile(),
@@ -174,7 +227,7 @@
TaskAttemptID mapId = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, i),0);
mapIds.add(mapId);
- MapTask map = new MapTask(file.toString(),
+ MapTask map = new MapTask(systemJobFile.toString(),
mapId, i,
rawSplits[i].getClassName(),
rawSplits[i].getBytes(), 1);
@@ -185,7 +238,7 @@
mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput);
- map.setJobFile(localFile.toString());
+ map.setJobFile(localJobFile.toString());
map.localizeConfiguration(localConf);
map.setConf(localConf);
map_tasks += 1;
@@ -202,7 +255,7 @@
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
try {
if (numReduceTasks > 0) {
- ReduceTask reduce = new ReduceTask(file.toString(),
+ ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
reduceId, 0, mapIds.size(), 1);
JobConf localConf = new JobConf(job);
TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
@@ -227,7 +280,7 @@
}
}
if (!this.isInterrupted()) {
- reduce.setJobFile(localFile.toString());
+ reduce.setJobFile(localJobFile.toString());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
@@ -275,8 +328,11 @@
} finally {
try {
- fs.delete(file.getParent(), true); // delete submit dir
- localFs.delete(localFile, true); // delete local copy
+ fs.delete(systemJobFile.getParent(), true); // delete submit dir
+ localFs.delete(localJobFile, true); // delete local copy
+ // Cleanup distributed cache
+ taskDistributedCacheManager.release();
+ trackerDistributerdCacheManager.purgeCache();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
@@ -489,5 +545,5 @@
@Override
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
return null;
-}
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Aug 25 10:27:53 2009
@@ -33,8 +33,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -64,6 +65,7 @@
private TaskTracker tracker;
+ private TaskDistributedCacheManager taskDistributedCacheManager;
protected JobConf conf;
JvmManager jvmManager;
@@ -101,18 +103,6 @@
*/
public void close() throws IOException {}
- private static String stringifyPathArray(Path[] p){
- if (p == null){
- return null;
- }
- StringBuffer str = new StringBuffer(p[0].toString());
- for (int i = 1; i < p.length; i++){
- str.append(",");
- str.append(p[i].toString());
- }
- return str.toString();
- }
-
/**
* Get the java command line options for the child map/reduce tasks.
@@ -173,11 +163,12 @@
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
// We don't create any symlinks yet, so presence/absence of workDir
// actually on the file system doesn't matter.
- setupDistributedCache(lDirAlloc, workDir, archives, files);
+ taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
+ .newTaskDistributedCacheManager(conf);
+ taskDistributedCacheManager.setup(
+ lDirAlloc, workDir, TaskTracker.getDistributedCacheDir());
// Set up the child task's configuration. After this call, no localization
// of files should happen in the TaskTracker's process space. Any changes to
@@ -189,7 +180,8 @@
}
// Build classpath
- List<String> classPaths = getClassPaths(conf, workDir, archives, files);
+ List<String> classPaths =
+ getClassPaths(conf, workDir, taskDistributedCacheManager);
long logSize = TaskLog.getTaskLogLength(conf);
@@ -249,18 +241,8 @@
}
} finally {
try{
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- if (archives != null){
- for (int i = 0; i < archives.length; i++){
- DistributedCache.releaseCache(archives[i], conf);
- }
- }
- if (files != null){
- for(int i = 0; i < files.length; i++){
- DistributedCache.releaseCache(files[i], conf);
- }
- }
+ taskDistributedCacheManager.release();
+
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
@@ -470,7 +452,7 @@
/**
*/
private static List<String> getClassPaths(JobConf conf, File workDir,
- URI[] archives, URI[] files)
+ TaskDistributedCacheManager taskDistributedCacheManager)
throws IOException {
// Accumulates class paths for child.
List<String> classPaths = new ArrayList<String>();
@@ -481,7 +463,7 @@
appendJobJarClasspaths(conf.getJar(), classPaths);
// Distributed cache paths
- appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+ classPaths.addAll(taskDistributedCacheManager.getClassPaths());
// Include the working dir too
classPaths.add(workDir.toString());
@@ -600,105 +582,6 @@
return new File(workDir.toString());
}
- private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
- URI[] archives, URI[] files) throws IOException {
- FileStatus fileStatus;
- FileSystem fileSystem;
- Path localPath;
- String baseDir;
- if ((archives != null) || (files != null)) {
- if (archives != null) {
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- Path[] p = new Path[archives.length];
- for (int i = 0; i < archives.length;i++){
- fileSystem = FileSystem.get(archives[i], conf);
- fileStatus = fileSystem.getFileStatus(
- new Path(archives[i].getPath()));
- String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getDistributedCacheDir() +
- Path.SEPARATOR + cacheId;
-
- localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- baseDir = localPath.toString().replace(cacheId, "");
- p[i] = DistributedCache.getLocalCache(archives[i], conf,
- new Path(baseDir),
- fileStatus,
- true, Long.parseLong(
- archivesTimestamps[i]),
- new Path(workDir.
- getAbsolutePath()),
- false);
-
- }
- DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
- }
- if ((files != null)) {
- String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
- Path[] p = new Path[files.length];
- for (int i = 0; i < files.length;i++){
- fileSystem = FileSystem.get(files[i], conf);
- fileStatus = fileSystem.getFileStatus(
- new Path(files[i].getPath()));
- String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getDistributedCacheDir() +
- Path.SEPARATOR + cacheId;
-
- localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- baseDir = localPath.toString().replace(cacheId, "");
- p[i] = DistributedCache.getLocalCache(files[i], conf,
- new Path(baseDir),
- fileStatus,
- false, Long.parseLong(
- fileTimestamps[i]),
- new Path(workDir.
- getAbsolutePath()),
- false);
- }
- DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
- }
- }
- }
-
- private static void appendDistributedCacheClasspaths(JobConf conf,
- URI[] archives, URI[] files, List<String> classPaths)
- throws IOException {
- // Archive paths
- Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
- if (archiveClasspaths != null && archives != null) {
- Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
- if (localArchives != null){
- for (int i=0;i<archives.length;i++){
- for(int j=0;j<archiveClasspaths.length;j++){
- if (archives[i].getPath().equals(
- archiveClasspaths[j].toString())){
- classPaths.add(localArchives[i].toString());
- }
- }
- }
- }
- }
-
- //file paths
- Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
- if (fileClasspaths!=null && files != null) {
- Path[] localFiles = DistributedCache
- .getLocalCacheFiles(conf);
- if (localFiles != null) {
- for (int i = 0; i < files.length; i++) {
- for (int j = 0; j < fileClasspaths.length; j++) {
- if (files[i].getPath().equals(
- fileClasspaths[j].toString())) {
- classPaths.add(localFiles[i].toString());
- }
- }
- }
- }
- }
- }
-
private static void appendSystemClasspaths(List<String> classPaths) {
for (String c : System.getProperty("java.class.path").split(
SYSTEM_PATH_SEPARATOR)) {
@@ -732,12 +615,22 @@
classPaths.add(jobCacheDir.toString());
}
- //Mostly for setting up the symlinks. Note that when we setup the distributed
- //cache, we didn't create the symlinks. This is done on a per task basis
- //by the currently executing task.
- public static void setupWorkDir(JobConf conf) throws IOException {
- File workDir = new File(".").getAbsoluteFile();
+ /**
+ * Creates distributed cache symlinks and tmp directory, as appropriate.
+ * Note that when we setup the distributed
+ * cache, we didn't create the symlinks. This is done on a per task basis
+ * by the currently executing task.
+ *
+ * @param conf The job configuration.
+ * @param workDir Working directory, which is completely deleted.
+ */
+ public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
+ LOG.debug("Fully deleting and re-creating" + workDir);
FileUtil.fullyDelete(workDir);
+ if (!workDir.mkdir()) {
+ LOG.debug("Did not recreate " + workDir);
+ }
+
if (DistributedCache.getSymlink(conf)) {
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
@@ -746,48 +639,58 @@
if (archives != null) {
for (int i = 0; i < archives.length; i++) {
String link = archives[i].getFragment();
- if (link != null) {
- link = workDir.toString() + Path.SEPARATOR + link;
- File flink = new File(link);
- if (!flink.exists()) {
- FileUtil.symLink(localArchives[i].toString(), link);
- }
- }
+ String target = localArchives[i].toString();
+ symlink(workDir, target, link);
}
}
if (files != null) {
for (int i = 0; i < files.length; i++) {
String link = files[i].getFragment();
- if (link != null) {
- link = workDir.toString() + Path.SEPARATOR + link;
- File flink = new File(link);
- if (!flink.exists()) {
- FileUtil.symLink(localFiles[i].toString(), link);
- }
- }
+ String target = localFiles[i].toString();
+ symlink(workDir, target, link);
}
}
}
- File jobCacheDir = null;
+
+ // For streaming, create extra symlinks (for all the files
+ // in the job cache dir) in the current working directory.
+ // Note that this is only executed if the configuration
+ // points to a jar file.
if (conf.getJar() != null) {
- jobCacheDir = new File(
+ File jobCacheDir = new File(
new Path(conf.getJar()).getParent().toString());
- }
-
- // create symlinks for all the files in job cache dir in current
- // workingdir for streaming
- try{
- DistributedCache.createAllSymlink(conf, jobCacheDir,
- workDir);
- } catch(IOException ie){
- // Do not exit even if symlinks have not been created.
- LOG.warn(StringUtils.stringifyException(ie));
+ try{
+ TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir,
+ workDir);
+ } catch(IOException ie){
+ // Do not exit even if symlinks have not been created.
+ LOG.warn(StringUtils.stringifyException(ie));
+ }
}
createChildTmpDir(workDir, conf);
}
/**
+ * Utility method for creating a symlink and warning on errors.
+ *
+ * If link is null, does nothing.
+ */
+ private static void symlink(File workDir, String target, String link)
+ throws IOException {
+ if (link != null) {
+ link = workDir.toString() + Path.SEPARATOR + link;
+ File flink = new File(link);
+ if (!flink.exists()) {
+ LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+ if (0 != FileUtil.symLink(target, link)) {
+ LOG.warn(String.format("Failed to create symlink: %s <- %s", target, link));
+ }
+ }
+ }
+ }
+
+ /**
* Kill the child process
*/
public void kill() {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 25 10:27:53 2009
@@ -51,7 +51,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -70,6 +69,7 @@
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -147,6 +147,8 @@
Server taskReportServer = null;
InterTrackerProtocol jobClient;
+
+ private TrackerDistributedCacheManager distributedCacheManager;
// last heartbeat response recieved
short heartbeatResponseId = -1;
@@ -567,8 +569,11 @@
this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
LOG.info("Starting tracker " + taskTrackerName);
- // Clear out temporary files that might be lying around
- DistributedCache.purgeCache(this.fConf);
+ // Initialize DistributedCache and
+ // clear out temporary files that might be lying around
+ this.distributedCacheManager =
+ new TrackerDistributedCacheManager(this.fConf);
+ this.distributedCacheManager.purgeCache();
cleanupStorage();
this.jobClient = (InterTrackerProtocol)
@@ -3511,4 +3516,8 @@
healthChecker = new NodeHealthCheckerService(conf);
healthChecker.start();
}
+
+ TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
+ return distributedCacheManager;
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Aug 25 10:27:53 2009
@@ -18,12 +18,12 @@
package org.apache.hadoop.mapreduce.filecache;
-import org.apache.commons.logging.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
@@ -35,7 +35,7 @@
* </p>
*
* <p>Applications specify the files, via urls (hdfs:// or http://) to be cached
- * via the org.apache.hadoop.mapred.JobConf. The
+ * via the {@link org.apache.hadoop.mapred.JobConf}. The
* <code>DistributedCache</code> assumes that the files specified via urls are
* already present on the {@link FileSystem} at the path specified by the url
* and are accessible by every machine in the cluster.</p>
@@ -82,8 +82,8 @@
* DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
* DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
*
- * 3. Use the cached files in the org.apache.hadoop.mapred.Mapper
- * or org.apache.hadoop.mapred.Reducer:
+ * 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
+ * or {@link org.apache.hadoop.mapred.Reducer}:
*
* public static class MapClass extends MapReduceBase
* implements Mapper<K, V, K, V> {
@@ -108,20 +108,24 @@
* }
*
* </pre></blockquote></p>
- *
+ *
+ * It is also very common to use the DistributedCache by using
+ * {@link org.apache.hadoop.util.GenericOptionsParser}.
+ *
+ * This class includes methods that should be used by users
+ * (specifically those mentioned in the example above, as well
+ * as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}),
+ * as well as methods intended for use by the MapReduce framework
+ * (e.g., {@link org.apache.hadoop.mapred.JobClient}). For implementation
+ * details, see {@link TrackerDistributedCacheManager} and
+ * {@link TaskDistributedCacheManager}.
+ *
+ * @see TrackerDistributedCacheManager
+ * @see TaskDistributedCacheManager
+ * @see org.apache.hadoop.mapred.JobConf
+ * @see org.apache.hadoop.mapred.JobClient
*/
public class DistributedCache {
- // cacheID to cacheStatus mapping
- private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
-
- private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
-
- // default total cache size
- private static final long DEFAULT_CACHE_SIZE = 10737418240L;
-
- private static final Log LOG =
- LogFactory.getLog(DistributedCache.class);
-
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -144,15 +148,18 @@
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static Path getLocalCache(URI cache, Configuration conf,
Path baseDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
Path currentWorkDir)
- throws IOException {
+ throws IOException {
return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
confFileStamp, currentWorkDir, true);
}
+
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -178,48 +185,19 @@
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static Path getLocalCache(URI cache, Configuration conf,
Path baseDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf)
- throws IOException {
- String cacheId = makeRelative(cache, conf);
- CacheStatus lcacheStatus;
- Path localizedPath;
- synchronized (cachedArchives) {
- lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus == null) {
- // was never localized
- lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
- cachedArchives.put(cacheId, lcacheStatus);
- }
+ Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
- synchronized (lcacheStatus) {
- localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
- lcacheStatus.refcount++;
- }
- }
-
- // try deleting stuff if you can
- long size = 0;
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(baseDir);
- if ( get != null ) {
- size = get.longValue();
- }
- }
- // setting the cache size to a default of 10GB
- long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
- if (allowedSize < size) {
- // try some cache deletions
- deleteCache(conf);
- }
- return localizedPath;
+ return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
+ baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+ honorSymLinkConf);
}
-
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -241,17 +219,18 @@
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
-
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static Path getLocalCache(URI cache, Configuration conf,
Path baseDir, boolean isArchive,
long confFileStamp, Path currentWorkDir)
- throws IOException {
+ throws IOException {
return getLocalCache(cache, conf,
baseDir, null, isArchive,
confFileStamp, currentWorkDir);
}
-
+
/**
* This is the opposite of getlocalcache. When you are done with
* using the cache, you need to release the cache
@@ -259,46 +238,14 @@
* @param conf configuration which contains the filesystem the cache
* is contained in.
* @throws IOException
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static void releaseCache(URI cache, Configuration conf)
- throws IOException {
- String cacheId = makeRelative(cache, conf);
- synchronized (cachedArchives) {
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus == null)
- return;
- synchronized (lcacheStatus) {
- lcacheStatus.refcount--;
- }
- }
+ throws IOException {
+ new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
}
- // To delete the caches which have a refcount of zero
-
- private static void deleteCache(Configuration conf) throws IOException {
- // try deleting cache Status with refcount of zero
- synchronized (cachedArchives) {
- for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
- String cacheId = (String) it.next();
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- synchronized (lcacheStatus) {
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
- }
- }
- it.remove();
- }
- }
- }
- }
- }
-
/*
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
@@ -306,189 +253,17 @@
* hostname/absolute path -- if it is just /absolute_path -- then the
* relative path is hostname of DFS this mapred cluster is running
* on/absolute_path
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static String makeRelative(URI cache, Configuration conf)
- throws IOException {
- String host = cache.getHost();
- if (host == null) {
- host = cache.getScheme();
- }
- if (host == null) {
- URI defaultUri = FileSystem.get(conf).getUri();
- host = defaultUri.getHost();
- if (host == null) {
- host = defaultUri.getScheme();
- }
- }
- String path = host + cache.getPath();
- path = path.replace(":/","/"); // remove windows device colon
- return path;
- }
-
- private static Path cacheFilePath(Path p) {
- return new Path(p, p.getName());
- }
-
- // the method which actually copies the caches locally and unjars/unzips them
- // and does chmod for the files
- private static Path localizeCache(Configuration conf,
- URI cache, long confFileStamp,
- CacheStatus cacheStatus,
- FileStatus fileStatus,
- boolean isArchive,
- Path currentWorkDir,boolean honorSymLinkConf)
- throws IOException {
- boolean doSymlink = honorSymLinkConf && getSymlink(conf);
- if(cache.getFragment() == null) {
- doSymlink = false;
- }
- FileSystem fs = FileSystem.get(cache, conf);
- String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
- File flink = new File(link);
- if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
- cacheStatus, fileStatus)) {
- if (isArchive) {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
- link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
- }
- } else {
- // remove the old archive
- // if the old archive cannot be removed since it is being used by another
- // job
- // return null
- if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
- throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
- + " is in use and cannot be refreshed");
-
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(cacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= cacheStatus.size;
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
- }
- Path parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
-
- if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
- throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
- }
-
- String cacheId = cache.getPath();
- fs.copyToLocalFile(new Path(cacheId), parchive);
- if (isArchive) {
- String tmpArchive = parchive.toString().toLowerCase();
- File srcFile = new File(parchive.toString());
- File destDir = new File(parchive.getParent().toString());
- if (tmpArchive.endsWith(".jar")) {
- RunJar.unJar(srcFile, destDir);
- } else if (tmpArchive.endsWith(".zip")) {
- FileUtil.unZip(srcFile, destDir);
- } else if (isTarFile(tmpArchive)) {
- FileUtil.unTar(srcFile, destDir);
- }
- // else will not do anyhting
- // and copy the file into the dir as it is
- }
-
- long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
- cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if( dirSize == null ) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
-
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
- LOG.warn("Exception in chmod" + e.toString());
- }
-
- // update cacheStatus to reflect the newly cached file
- cacheStatus.currentStatus = true;
- cacheStatus.mtime = getTimestamp(conf, cache);
- }
-
- if (isArchive){
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
- link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
- }
- }
-
- private static boolean isTarFile(String filename) {
- return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
- filename.endsWith(".tar"));
- }
-
- // Checks if the cache has already been localized and is fresh
- private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
- URI cache, long confFileStamp,
- CacheStatus lcacheStatus,
- FileStatus fileStatus)
- throws IOException {
- // check for existence of the cache
- if (lcacheStatus.currentStatus == false) {
- return false;
- } else {
- long dfsFileStamp;
- if (fileStatus != null) {
- dfsFileStamp = fileStatus.getModificationTime();
- } else {
- dfsFileStamp = getTimestamp(conf, cache);
- }
-
- // ensure that the file on hdfs hasn't been modified since the job started
- if (dfsFileStamp != confFileStamp) {
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
- throw new IOException("File: " + cache +
- " has changed on HDFS since job started");
- }
-
- if (dfsFileStamp != lcacheStatus.mtime) {
- // needs refreshing
- return false;
- }
- }
-
- return true;
+ throws IOException {
+ return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
}
/**
* Returns mtime of a given cache file on hdfs.
+ *
* @param conf configuration
* @param cache cache file
* @return mtime of a given cache file on hdfs
@@ -501,32 +276,24 @@
return fileSystem.getFileStatus(filePath).getModificationTime();
}
-
+
/**
* This method create symlinks for all files in a given dir in another directory
* @param conf the configuration
* @param jobCacheDir the target directory for creating symlinks
* @param workDir the directory in which the symlinks are created
* @throws IOException
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
- throws IOException{
- if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
- workDir == null || (!workDir.isDirectory())) {
- return;
- }
- boolean createSymlink = getSymlink(conf);
- if (createSymlink){
- File[] list = jobCacheDir.listFiles();
- for (int i=0; i < list.length; i++){
- FileUtil.symLink(list[i].getAbsolutePath(),
- new File(workDir, list[i].getName()).toString());
- }
- }
+ throws IOException{
+ TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
}
/**
- * Set the configuration with the given set of archives
+ * Set the configuration with the given set of archives. Intended
+ * to be used by user code.
* @param archives The list of archives that need to be localized
* @param conf Configuration which will be changed
*/
@@ -536,7 +303,8 @@
}
/**
- * Set the configuration with the given set of files
+ * Set the configuration with the given set of files. Intended to be
+ * used by user code.
* @param files The list of files that need to be localized
* @param conf Configuration which will be changed
*/
@@ -546,7 +314,8 @@
}
/**
- * Get cache archives set in the Configuration
+ * Get cache archives set in the Configuration. Used by
+ * internal DistributedCache and MapReduce code.
* @param conf The configuration which contains the archives
* @return A URI array of the caches set in the Configuration
* @throws IOException
@@ -556,18 +325,19 @@
}
/**
- * Get cache files set in the Configuration
+ * Get cache files set in the Configuration. Used by internal
+ * DistributedCache and MapReduce code.
* @param conf The configuration which contains the files
* @return A URI array of the files set in the Configuration
* @throws IOException
*/
-
public static URI[] getCacheFiles(Configuration conf) throws IOException {
return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
}
/**
- * Return the path array of the localized caches
+ * Return the path array of the localized caches. Intended to be used
+ * by user code.
* @param conf Configuration that contains the localized archives
* @return A path array of localized caches
* @throws IOException
@@ -579,7 +349,8 @@
}
/**
- * Return the path array of the localized files
+ * Return the path array of the localized files. Intended to be used
+ * by user code.
* @param conf Configuration that contains the localized files
* @return A path array of localized files
* @throws IOException
@@ -590,7 +361,8 @@
}
/**
- * Get the timestamps of the archives
+ * Get the timestamps of the archives. Used by internal
+ * DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
* @return a string array of timestamps
* @throws IOException
@@ -601,7 +373,8 @@
/**
- * Get the timestamps of the files
+ * Get the timestamps of the files. Used by internal
+ * DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
* @return a string array of timestamps
* @throws IOException
@@ -611,7 +384,8 @@
}
/**
- * This is to check the timestamp of the archives to be localized
+ * This is to check the timestamp of the archives to be localized.
+ * Used by internal MapReduce code.
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of archives.
* The order should be the same as the order in which the archives are added.
@@ -621,7 +395,8 @@
}
/**
- * This is to check the timestamp of the files to be localized
+ * This is to check the timestamp of the files to be localized.
+ * Used by internal MapReduce code.
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of files.
* The order should be the same as the order in which the files are added.
@@ -631,7 +406,8 @@
}
/**
- * Set the conf to contain the location for localized archives
+ * Set the conf to contain the location for localized archives. Used
+ * by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local archives
*/
@@ -640,7 +416,8 @@
}
/**
- * Set the conf to contain the location for localized files
+ * Set the conf to contain the location for localized files. Used
+ * by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local files
*/
@@ -649,7 +426,8 @@
}
/**
- * Add a archives to be localized to the conf
+ * Add a archives to be localized to the conf. Intended to
+ * be used by user code.
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
*/
@@ -660,7 +438,8 @@
}
/**
- * Add a file to be localized to the conf
+ * Add a file to be localized to the conf. Intended
+ * to be used by user code.
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
*/
@@ -672,7 +451,7 @@
/**
* Add an file path to the current set of classpath entries It adds the file
- * to cache as well.
+ * to cache as well. Intended to be used by user code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
@@ -689,7 +468,8 @@
}
/**
- * Get the file entries in classpath as an array of Path
+ * Get the file entries in classpath as an array of Path.
+ * Used by internal DistributedCache code.
*
* @param conf Configuration that contains the classpath setting
*/
@@ -708,7 +488,7 @@
/**
* Add an archive path to the current set of classpath entries. It adds the
- * archive to cache as well.
+ * archive to cache as well. Intended to be used by user code.
*
* @param archive Path of the archive to be added
* @param conf Configuration that contains the classpath setting
@@ -725,7 +505,8 @@
}
/**
- * Get the archive entries in classpath as an array of Path
+ * Get the archive entries in classpath as an array of Path.
+ * Used by internal DistributedCache code.
*
* @param conf Configuration that contains the classpath setting
*/
@@ -744,7 +525,8 @@
/**
* This method allows you to create symlinks in the current working directory
- * of the task to all the cache files/archives
+ * of the task to all the cache files/archives.
+ * Intended to be used by user code.
* @param conf the jobconf
*/
public static void createSymlink(Configuration conf){
@@ -754,6 +536,7 @@
/**
* This method checks to see if symlinks are to be create for the
* localized cache files in the current working directory
+ * Used by internal DistributedCache code.
* @param conf the jobconf
* @return true if symlinks are to be created- else return false
*/
@@ -769,7 +552,7 @@
* This method checks if there is a conflict in the fragment names
* of the uris. Also makes sure that each uri has a fragment. It
* is only to be called if you want to create symlinks for
- * the various archives and files.
+ * the various archives and files. May be used by user code.
* @param uriFiles The uri array of urifiles
* @param uriArchives the uri array of uri archives
*/
@@ -811,52 +594,14 @@
return true;
}
- private static class CacheStatus {
- // false, not loaded yet, true is loaded
- boolean currentStatus;
-
- // the local load path of this cache
- Path localLoadPath;
-
- //the base dir where the cache lies
- Path baseDir;
-
- //the size of this cache
- long size;
-
- // number of instances using this cache
- int refcount;
-
- // the cache-file modification time
- long mtime;
-
- public CacheStatus(Path baseDir, Path localLoadPath) {
- super();
- this.currentStatus = false;
- this.localLoadPath = localLoadPath;
- this.refcount = 0;
- this.mtime = -1;
- this.baseDir = baseDir;
- this.size = 0;
- }
- }
-
/**
* Clear the entire contents of the cache and delete the backing files. This
* should only be used when the server is reinitializing, because the users
* are going to lose their files.
+ * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
+ * instead.
*/
public static void purgeCache(Configuration conf) throws IOException {
- synchronized (cachedArchives) {
- FileSystem localFs = FileSystem.getLocal(conf);
- for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
- try {
- localFs.delete(f.getValue().localLoadPath, true);
- } catch (IOException ie) {
- LOG.debug("Error cleaning up cache", ie);
- }
- }
- cachedArchives.clear();
- }
+ new TrackerDistributedCacheManager(conf).purgeCache();
}
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=807543&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Aug 25 10:27:53 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Helper class of {@link TrackerDistributedCacheManager} that represents
+ * the cached files of a single task. This class is used
+ * by TaskRunner/LocalJobRunner to parse out the job configuration
+ * and setup the local caches.
+ *
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TaskDistributedCacheManager {
+ private final TrackerDistributedCacheManager distributedCacheManager;
+ private final Configuration taskConf;
+ private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
+ private final List<String> classPaths = new ArrayList<String>();
+
+ private boolean setupCalled = false;
+
+ /**
+ * Struct representing a single cached file.
+ * There are four permutations (archive, file) and
+ * (don't put in classpath, do put in classpath).
+ */
+ static class CacheFile {
+ /** URI as in the configuration */
+ final URI uri;
+ enum FileType {
+ REGULAR,
+ ARCHIVE
+ }
+ /** Whether to decompress */
+ final FileType type;
+ final long timestamp;
+ /** Whether this is to be added to the classpath */
+ final boolean shouldBeAddedToClassPath;
+
+ private CacheFile(URI uri, FileType type, long timestamp,
+ boolean classPath) {
+ this.uri = uri;
+ this.type = type;
+ this.timestamp = timestamp;
+ this.shouldBeAddedToClassPath = classPath;
+ }
+
+ /**
+ * Converts the scheme used by DistributedCache to serialize what files to
+ * cache in the configuration into CacheFile objects that represent those
+ * files.
+ */
+ private static List<CacheFile> makeCacheFiles(URI[] uris,
+ String[] timestamps, Path[] paths, FileType type) {
+ List<CacheFile> ret = new ArrayList<CacheFile>();
+ if (uris != null) {
+ if (uris.length != timestamps.length) {
+ throw new IllegalArgumentException("Mismatched uris and timestamps.");
+ }
+ Map<String, Path> classPaths = new HashMap<String, Path>();
+ if (paths != null) {
+ for (Path p : paths) {
+ classPaths.put(p.toString(), p);
+ }
+ }
+ for (int i = 0; i < uris.length; ++i) {
+ URI u = uris[i];
+ boolean isClassPath = (null != classPaths.get(u.getPath()));
+ long t = Long.parseLong(timestamps[i]);
+ ret.add(new CacheFile(u, type, t, isClassPath));
+ }
+ }
+ return ret;
+ }
+ }
+
+ TaskDistributedCacheManager(
+ TrackerDistributedCacheManager distributedCacheManager,
+ Configuration taskConf) throws IOException {
+ this.distributedCacheManager = distributedCacheManager;
+ this.taskConf = taskConf;
+
+ this.cacheFiles.addAll(
+ CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
+ DistributedCache.getFileTimestamps(taskConf),
+ DistributedCache.getFileClassPaths(taskConf),
+ CacheFile.FileType.REGULAR));
+ this.cacheFiles.addAll(
+ CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
+ DistributedCache.getArchiveTimestamps(taskConf),
+ DistributedCache.getArchiveClassPaths(taskConf),
+ CacheFile.FileType.ARCHIVE));
+ }
+
+ /**
+ * Retrieve files into the local cache and updates the task configuration
+ * (which has been passed in via the constructor).
+ *
+ * It is the caller's responsibility to re-write the task configuration XML
+ * file, if necessary.
+ */
+ public void setup(LocalDirAllocator lDirAlloc, File workDir,
+ String cacheSubdir) throws IOException {
+ setupCalled = true;
+
+ if (cacheFiles.isEmpty()) {
+ return;
+ }
+
+ ArrayList<Path> localArchives = new ArrayList<Path>();
+ ArrayList<Path> localFiles = new ArrayList<Path>();
+ Path workdirPath = new Path(workDir.getAbsolutePath());
+
+ for (CacheFile cacheFile : cacheFiles) {
+ URI uri = cacheFile.uri;
+ FileSystem fileSystem = FileSystem.get(uri, taskConf);
+ FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
+ String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
+ String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
+ Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), taskConf);
+ String baseDir = localPath.toString().replace(cacheId, "");
+ Path p = distributedCacheManager.getLocalCache(uri, taskConf,
+ new Path(baseDir), fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, workdirPath, false);
+
+ if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
+ localArchives.add(p);
+ } else {
+ localFiles.add(p);
+ }
+ if (cacheFile.shouldBeAddedToClassPath) {
+ classPaths.add(p.toString());
+ }
+ }
+
+ // Update the configuration object with localized data.
+ if (!localArchives.isEmpty()) {
+ DistributedCache.setLocalArchives(taskConf,
+ stringifyPathList(localArchives));
+ }
+ if (!localFiles.isEmpty()) {
+ DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+ }
+
+ }
+
+ private static String stringifyPathList(List<Path> p){
+ if (p == null || p.isEmpty()) {
+ return null;
+ }
+ StringBuilder str = new StringBuilder(p.get(0).toString());
+ for (int i = 1; i < p.size(); i++){
+ str.append(",");
+ str.append(p.get(i).toString());
+ }
+ return str.toString();
+ }
+
+ /**
+ * Retrieves class paths (as local references) to add.
+ * Should be called after setup().
+ *
+ */
+ public List<String> getClassPaths() throws IOException {
+ if (!setupCalled) {
+ throw new IllegalStateException(
+ "getClassPaths() should be called after setup()");
+ }
+ return classPaths;
+ }
+
+ /**
+ * Releases the cached files/archives, so that space
+ * can be reclaimed by the {@link TrackerDistributedCacheManager}.
+ */
+ public void release() throws IOException {
+ for (CacheFile c : cacheFiles) {
+ distributedCacheManager.releaseCache(c.uri, taskConf);
+ }
+ }
+
+ /**
+ * Creates a class loader that includes the designated
+ * files and archives.
+ */
+ public ClassLoader makeClassLoader(final ClassLoader parent)
+ throws MalformedURLException {
+ final URL[] urls = new URL[classPaths.size()];
+ for (int i = 0; i < classPaths.size(); ++i) {
+ urls[i] = new File(classPaths.get(i)).toURI().toURL();
+ }
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+ @Override
+ public ClassLoader run() {
+ return new URLClassLoader(urls, parent);
+ }
+ });
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=807543&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Aug 25 10:27:53 2009
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Manages a single machine's instance of a cross-job
+ * cache. This class would typically be instantiated
+ * by a TaskTracker (or something that emulates it,
+ * like LocalJobRunner).
+ *
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TrackerDistributedCacheManager {
+ // cacheID to cacheStatus mapping
+ private TreeMap<String, CacheStatus> cachedArchives =
+ new TreeMap<String, CacheStatus>();
+
+ private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+
+ // default total cache size (10GB)
+ private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+
+ private static final Log LOG =
+ LogFactory.getLog(TrackerDistributedCacheManager.class);
+
+ private final LocalFileSystem localFs;
+
+ public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+ this.localFs = FileSystem.getLocal(conf);
+ }
+
+ /**
+ * Get the locally cached file or archive; it could either be
+ * previously cached (and valid) or copy it from the {@link FileSystem} now.
+ *
+ * @param cache the cache to be localized, this should be specified as
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+ * @param conf The Configuration file which contains the filesystem
+ * @param baseDir The base cache Dir where you wnat to localize the
+ * files/archives
+ * @param fileStatus The file status on the dfs.
+ * @param isArchive if the cache is an archive or a file. In case it is an
+ * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+ * be unzipped/unjarred/untarred automatically
+ * and the directory where the archive is unzipped/unjarred/untarred is
+ * returned as the Path.
+ * In case of a file, the path to the file is returned
+ * @param confFileStamp this is the hdfs file modification timestamp to verify
+ * that the file to be cached hasn't changed since the job started
+ * @param currentWorkDir this is the directory where you would want to create
+ * symlinks for the locally cached files/archives
+ * @param honorSymLinkConf if this is false, then the symlinks are not
+ * created even if conf says so (this is required for an optimization in task
+ * launches
+ * NOTE: This is effectively always on since r696957, since there is no code
+ * path that does not use this.
+ * @return the path to directory where the archives are unjarred in case of
+ * archives, the path to the file where the file is copied locally
+ * @throws IOException
+ */
+ Path getLocalCache(URI cache, Configuration conf,
+ Path baseDir, FileStatus fileStatus,
+ boolean isArchive, long confFileStamp,
+ Path currentWorkDir, boolean honorSymLinkConf)
+ throws IOException {
+ String cacheId = makeRelative(cache, conf);
+ CacheStatus lcacheStatus;
+ Path localizedPath;
+ synchronized (cachedArchives) {
+ lcacheStatus = cachedArchives.get(cacheId);
+ if (lcacheStatus == null) {
+ // was never localized
+ lcacheStatus = new CacheStatus(baseDir,
+ new Path(baseDir, new Path(cacheId)));
+ cachedArchives.put(cacheId, lcacheStatus);
+ }
+
+ synchronized (lcacheStatus) {
+ localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
+ fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
+ lcacheStatus.refcount++;
+ }
+ }
+
+ // try deleting stuff if you can
+ long size = 0;
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(baseDir);
+ if ( get != null ) {
+ size = get.longValue();
+ }
+ }
+ // setting the cache size to a default of 10GB
+ long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+ if (allowedSize < size) {
+ // try some cache deletions
+ deleteCache(conf);
+ }
+ return localizedPath;
+ }
+
+ /**
+ * This is the opposite of getlocalcache. When you are done with
+ * using the cache, you need to release the cache
+ * @param cache The cache URI to be released
+ * @param conf configuration which contains the filesystem the cache
+ * is contained in.
+ * @throws IOException
+ */
+ void releaseCache(URI cache, Configuration conf)
+ throws IOException {
+ String cacheId = makeRelative(cache, conf);
+ synchronized (cachedArchives) {
+ CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+ if (lcacheStatus == null)
+ return;
+ synchronized (lcacheStatus) {
+ lcacheStatus.refcount--;
+ }
+ }
+ }
+
+ // To delete the caches which have a refcount of zero
+
+ private void deleteCache(Configuration conf) throws IOException {
+ // try deleting cache Status with refcount of zero
+ synchronized (cachedArchives) {
+ for (Iterator<String> it = cachedArchives.keySet().iterator();
+ it.hasNext();) {
+ String cacheId = it.next();
+ CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+ synchronized (lcacheStatus) {
+ if (lcacheStatus.refcount == 0) {
+ // delete this cache entry
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ if ( dirSize != null ) {
+ dirSize -= lcacheStatus.size;
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ }
+ }
+ it.remove();
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Returns the relative path of the dir this cache will be localized in
+ * relative path that this cache will be localized in. For
+ * hdfs://hostname:port/absolute_path -- the relative path is
+ * hostname/absolute path -- if it is just /absolute_path -- then the
+ * relative path is hostname of DFS this mapred cluster is running
+ * on/absolute_path
+ */
+ String makeRelative(URI cache, Configuration conf)
+ throws IOException {
+ String host = cache.getHost();
+ if (host == null) {
+ host = cache.getScheme();
+ }
+ if (host == null) {
+ URI defaultUri = FileSystem.get(conf).getUri();
+ host = defaultUri.getHost();
+ if (host == null) {
+ host = defaultUri.getScheme();
+ }
+ }
+ String path = host + cache.getPath();
+ path = path.replace(":/","/"); // remove windows device colon
+ return path;
+ }
+
+ private Path cacheFilePath(Path p) {
+ return new Path(p, p.getName());
+ }
+
+ // the method which actually copies the caches locally and unjars/unzips them
+ // and does chmod for the files
+ private Path localizeCache(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive,
+ Path currentWorkDir,
+ boolean honorSymLinkConf)
+ throws IOException {
+ boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
+ if(cache.getFragment() == null) {
+ doSymlink = false;
+ }
+ FileSystem fs = FileSystem.get(cache, conf);
+ String link =
+ currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+ File flink = new File(link);
+ if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+ cacheStatus, fileStatus)) {
+ LOG.info(String.format("Using existing cache of %s->%s",
+ cache.toString(), cacheStatus.localLoadPath));
+ if (isArchive) {
+ if (doSymlink){
+ if (!flink.exists())
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+ link);
+ }
+
+ return cacheStatus.localLoadPath;
+ }
+ else {
+ if (doSymlink){
+ if (!flink.exists())
+ FileUtil.symLink(
+ cacheFilePath(cacheStatus.localLoadPath).toString(), link);
+ }
+ return cacheFilePath(cacheStatus.localLoadPath);
+ }
+ } else {
+
+ // remove the old archive
+ // if the old archive cannot be removed since it is being used by another
+ // job
+ // return null
+ if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+ throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+ + " is in use and cannot be refreshed");
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(cacheStatus.localLoadPath, true);
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ if ( dirSize != null ) {
+ dirSize -= cacheStatus.size;
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
+ }
+ Path parchive = new Path(cacheStatus.localLoadPath,
+ new Path(cacheStatus.localLoadPath.getName()));
+
+ if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
+ throw new IOException("Mkdirs failed to create directory " +
+ cacheStatus.localLoadPath.toString());
+ }
+
+ String cacheId = cache.getPath();
+ fs.copyToLocalFile(new Path(cacheId), parchive);
+ if (isArchive) {
+ String tmpArchive = parchive.toString().toLowerCase();
+ File srcFile = new File(parchive.toString());
+ File destDir = new File(parchive.getParent().toString());
+ LOG.info(String.format("Extracting %s to %s",
+ srcFile.toString(), destDir.toString()));
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(srcFile, destDir);
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(srcFile, destDir);
+ } else if (isTarFile(tmpArchive)) {
+ FileUtil.unTar(srcFile, destDir);
+ } else {
+ LOG.warn(String.format(
+ "Cache file %s specified as archive, but not valid extension.",
+ srcFile.toString()));
+ // else will not do anyhting
+ // and copy the file into the dir as it is
+ }
+ }
+
+ long cacheSize =
+ FileUtil.getDU(new File(parchive.getParent().toString()));
+ cacheStatus.size = cacheSize;
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ if( dirSize == null ) {
+ dirSize = Long.valueOf(cacheSize);
+ } else {
+ dirSize += cacheSize;
+ }
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
+
+ // do chmod here
+ try {
+ //Setting recursive permission to grant everyone read and execute
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+ } catch(InterruptedException e) {
+ LOG.warn("Exception in chmod" + e.toString());
+ }
+
+ // update cacheStatus to reflect the newly cached file
+ cacheStatus.currentStatus = true;
+ cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
+
+ LOG.info(String.format("Cached %s as %s",
+ cache.toString(), cacheStatus.localLoadPath));
+ }
+
+ if (isArchive){
+ if (doSymlink){
+ if (!flink.exists())
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+ link);
+ }
+ return cacheStatus.localLoadPath;
+ }
+ else {
+ if (doSymlink){
+ if (!flink.exists())
+ FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+ link);
+ }
+ return cacheFilePath(cacheStatus.localLoadPath);
+ }
+ }
+
+ private static boolean isTarFile(String filename) {
+ return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
+ filename.endsWith(".tar"));
+ }
+
+ // Checks if the cache has already been localized and is fresh
+ private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+ URI cache, long confFileStamp,
+ CacheStatus lcacheStatus,
+ FileStatus fileStatus)
+ throws IOException {
+ // check for existence of the cache
+ if (lcacheStatus.currentStatus == false) {
+ return false;
+ } else {
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
+ } else {
+ dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
+ }
+
+ // ensure that the file on hdfs hasn't been modified since the job started
+ if (dfsFileStamp != confFileStamp) {
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+ throw new IOException("File: " + cache +
+ " has changed on HDFS since job started");
+ }
+
+ if (dfsFileStamp != lcacheStatus.mtime) {
+ // needs refreshing
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * This method create symlinks for all files in a given dir in another
+ * directory.
+ *
+ * Should not be used outside of DistributedCache code.
+ *
+ * @param conf the configuration
+ * @param jobCacheDir the target directory for creating symlinks
+ * @param workDir the directory in which the symlinks are created
+ * @throws IOException
+ */
+ public static void createAllSymlink(Configuration conf, File jobCacheDir,
+ File workDir)
+ throws IOException{
+ if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+ workDir == null || (!workDir.isDirectory())) {
+ return;
+ }
+ boolean createSymlink = DistributedCache.getSymlink(conf);
+ if (createSymlink){
+ File[] list = jobCacheDir.listFiles();
+ for (int i=0; i < list.length; i++){
+ String target = list[i].getAbsolutePath();
+ String link = new File(workDir, list[i].getName()).toString();
+ LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+ int ret = FileUtil.symLink(target, link);
+ if (ret != 0) {
+ LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+ link));
+ }
+ }
+ }
+ }
+
+ private static class CacheStatus {
+ // false, not loaded yet, true is loaded
+ boolean currentStatus;
+
+ // the local load path of this cache
+ Path localLoadPath;
+
+ //the base dir where the cache lies
+ Path baseDir;
+
+ //the size of this cache
+ long size;
+
+ // number of instances using this cache
+ int refcount;
+
+ // the cache-file modification time
+ long mtime;
+
+ public CacheStatus(Path baseDir, Path localLoadPath) {
+ super();
+ this.currentStatus = false;
+ this.localLoadPath = localLoadPath;
+ this.refcount = 0;
+ this.mtime = -1;
+ this.baseDir = baseDir;
+ this.size = 0;
+ }
+ }
+
+ /**
+ * Clear the entire contents of the cache and delete the backing files. This
+ * should only be used when the server is reinitializing, because the users
+ * are going to lose their files.
+ */
+ public void purgeCache() {
+ synchronized (cachedArchives) {
+ for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
+ try {
+ localFs.delete(f.getValue().localLoadPath, true);
+ } catch (IOException ie) {
+ LOG.debug("Error cleaning up cache", ie);
+ }
+ }
+ cachedArchives.clear();
+ }
+ }
+
+ public TaskDistributedCacheManager newTaskDistributedCacheManager(
+ Configuration taskConf) throws IOException {
+ return new TaskDistributedCacheManager(this, taskConf);
+ }
+
+ /**
+ * Determines timestamps of files to be cached, and stores those
+ * in the configuration. This is intended to be used internally by JobClient
+ * after all cache files have been added.
+ *
+ * This is an internal method!
+ *
+ * @param job Configuration of a job.
+ * @throws IOException
+ */
+ public static void determineTimestamps(Configuration job) throws IOException {
+ URI[] tarchives = DistributedCache.getCacheArchives(job);
+ if (tarchives != null) {
+ StringBuffer archiveTimestamps =
+ new StringBuffer(String.valueOf(
+ DistributedCache.getTimestamp(job, tarchives[0])));
+ for (int i = 1; i < tarchives.length; i++) {
+ archiveTimestamps.append(",");
+ archiveTimestamps.append(String.valueOf(
+ DistributedCache.getTimestamp(job, tarchives[i])));
+ }
+ DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+ }
+
+ URI[] tfiles = DistributedCache.getCacheFiles(job);
+ if (tfiles != null) {
+ StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
+ DistributedCache.getTimestamp(job, tfiles[0])));
+ for (int i = 1; i < tfiles.length; i++) {
+ fileTimestamps.append(",");
+ fileTimestamps.append(String.valueOf(
+ DistributedCache.getTimestamp(job, tfiles[i])));
+ }
+ DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/commit-tests?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/commit-tests (original)
+++ hadoop/mapreduce/trunk/src/test/commit-tests Tue Aug 25 10:27:53 2009
@@ -36,4 +36,6 @@
**/TestTextOutputFormat.java
**/TestTrackerBlacklistAcrossJobs.java
**/TestTaskTrackerBlacklisting.java
+**/TestTaskTrackerLocalization
+**/TestTrackerDistributedCacheManager