You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC
svn commit: r1079211 [9/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/contrib/fairscheduler/designdoc/ src/contrib/streaming/s...
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Mar 8 05:56:27 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.TaskTrac
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.StringUtils;
/**
@@ -46,14 +45,13 @@ class TaskMemoryManagerThread extends Th
private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
private TaskTracker taskTracker;
- private long monitoringInterval;
-
- private long maxMemoryAllowedForAllTasks;
private long maxRssMemoryAllowedForAllTasks;
- private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
- private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
- private List<TaskAttemptID> tasksToBeRemoved;
+ private final long monitoringInterval;
+ private final long maxMemoryAllowedForAllTasks;
+ private final Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
+ private final Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
+ private final List<TaskAttemptID> tasksToBeRemoved;
private static final String MEMORY_USAGE_STRING =
"Memory usage of ProcessTree %s for task-id %s : Virutal %d bytes, " +
@@ -91,7 +89,8 @@ class TaskMemoryManagerThread extends Th
this.monitoringInterval = monitoringInterval;
}
- public void addTask(TaskAttemptID tid, long memLimit, long memLimitPhysical) {
+ public void addTask(TaskAttemptID tid, long memLimit,
+ long memLimitPhysical) {
synchronized (tasksToBeAdded) {
LOG.debug("Tracking ProcessTree " + tid + " for the first time");
ProcessTreeInfo ptInfo =
@@ -204,19 +203,10 @@ class TaskMemoryManagerThread extends Th
if (pId != null) {
// pId will be null, either if the JVM is not spawned yet or if
// the JVM is removed from jvmIdToPid
- long sleeptimeBeforeSigkill =
- taskTracker
- .getJobConf()
- .getLong(
- TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
- // create process tree object
- ProcfsBasedProcessTree pt =
- new ProcfsBasedProcessTree(pId,
- ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
LOG.debug("Tracking ProcessTree " + pId + " for the first time");
+ ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId,
+ TaskController.isSetsidAvailable);
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
}
@@ -284,9 +274,13 @@ class TaskMemoryManagerThread extends Th
// Virtual or physical memory over limit. Fail the task and remove
// the corresponding process tree
LOG.warn(msg);
+ // warn if not a leader
+ if (!pTree.checkPidPgrpidForMatch()) {
+ LOG.error("Killed task process with PID " + pId +
+ " but it is not a process group leader.");
+ }
+ // kill the task
taskTracker.cleanUpOverMemoryTask(tid, true, msg);
- // Now destroy the ProcessTree, remove it from monitoring map.
- pTree.destroy(true/*in the background*/);
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
@@ -298,8 +292,7 @@ class TaskMemoryManagerThread extends Th
} catch (Exception e) {
// Log the exception and proceed to the next task.
LOG.warn("Uncaught exception in TaskMemoryManager "
- + "while managing memory of " + tid + " : "
- + StringUtils.stringifyException(e));
+ + "while managing memory of " + tid, e);
}
}
@@ -528,8 +521,6 @@ class TaskMemoryManagerThread extends Th
taskTracker.cleanUpOverMemoryTask(tid, false, msg);
// Now destroy the ProcessTree, remove it from monitoring map.
ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
- ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
- pTree.destroy(true/*in the background*/);
processTreeInfoMap.remove(tid);
LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Mar 8 05:56:27 2011
@@ -20,16 +20,16 @@ package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Vector;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,21 +67,30 @@ abstract class TaskRunner extends Thread
private boolean exitCodeSet = false;
private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+ static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
private TaskTracker tracker;
private TaskDistributedCacheManager taskDistributedCacheManager;
+ private String[] localdirs;
+ final private static Random rand;
+ static {
+ rand = new Random();
+ }
protected JobConf conf;
JvmManager jvmManager;
public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
- JobConf conf) {
+ JobConf conf, TaskTracker.RunningJob rjob
+ ) throws IOException {
this.tip = tip;
this.t = tip.getTask();
this.tracker = tracker;
this.conf = conf;
this.jvmManager = tracker.getJvmManagerInstance();
+ this.localdirs = conf.getLocalDirs();
+ taskDistributedCacheManager = rjob.distCacheMgr;
}
public Task getTask() { return t; }
@@ -155,26 +164,13 @@ abstract class TaskRunner extends Thread
//all the archives
TaskAttemptID taskid = t.getTaskID();
final LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
- final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
- // We don't create any symlinks yet, so presence/absence of workDir
- // actually on the file system doesn't matter.
- tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws IOException {
- taskDistributedCacheManager =
- tracker.getTrackerDistributedCacheManager()
- .newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
- .getPrivateDistributedCacheDir(conf.getUser()),
- TaskTracker.getPublicDistributedCacheDir());
- return null;
- }
- });
-
- // Set up the child task's configuration. After this call, no localization
- // of files should happen in the TaskTracker's process space. Any changes to
- // the conf object after this will NOT be reflected to the child.
- setupChildTaskConfiguration(lDirAlloc);
+ //simply get the location of the workDir and pass it to the child. The
+ //child will do the actual dir creation
+ final File workDir =
+ new File(new Path(localdirs[rand.nextInt(localdirs.length)],
+ TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
+ taskid.toString(),
+ t.isTaskCleanupTask())).toString());
// Build classpath
List<String> classPaths =
@@ -189,7 +185,7 @@ abstract class TaskRunner extends Thread
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- List<String> setup = getVMSetupCmd();
+ String setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
@@ -202,7 +198,20 @@ abstract class TaskRunner extends Thread
errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
taskid, logSize);
- launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
+ // flatten the env as a set of export commands
+ List <String> setupCmds = new ArrayList<String>();
+ for(Entry<String, String> entry : env.entrySet()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("export ");
+ sb.append(entry.getKey());
+ sb.append("=\"");
+ sb.append(entry.getValue());
+ sb.append("\"");
+ setupCmds.add(sb.toString());
+ }
+ setupCmds.add(setup);
+
+ launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (exitCodeSet) {
if (!killed && exitCode != 0) {
@@ -231,14 +240,6 @@ abstract class TaskRunner extends Thread
LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
}
} finally {
- try{
- if (taskDistributedCacheManager != null) {
- taskDistributedCacheManager.release();
- }
- }catch(IOException ie){
- LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
- }
-
// It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
// *false* since the task has either
// a) SUCCEEDED - which means commit has been done
@@ -247,11 +248,11 @@ abstract class TaskRunner extends Thread
}
}
- void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
- File stderr, long logSize, File workDir, Map<String, String> env)
- throws InterruptedException {
+ void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
+ File stderr, long logSize, File workDir)
+ throws InterruptedException, IOException {
jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
- stderr, logSize, workDir, env, conf));
+ stderr, logSize, workDir, conf));
synchronized (lock) {
while (!done) {
lock.wait();
@@ -302,7 +303,7 @@ abstract class TaskRunner extends Thread
.isTaskCleanupTask()), conf);
// write the child's task configuration file to the local disk
- writeLocalTaskFile(localTaskFile.toString(), conf);
+ JobLocalizer.writeLocalJobFile(localTaskFile, conf);
// Set the final job file in the task. The child needs to know the correct
// path to job.xml. So set this path accordingly.
@@ -312,16 +313,21 @@ abstract class TaskRunner extends Thread
/**
* @return
*/
- private List<String> getVMSetupCmd() {
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(getChildUlimit(conf));
- List<String> setup = null;
- if (ulimitCmd != null) {
- setup = new ArrayList<String>();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
+ private String getVMSetupCmd() {
+ final int ulimit = getChildUlimit(conf);
+ if (ulimit <= 0) {
+ return "";
+ }
+ String setup[] = Shell.getUlimitMemoryCommand(ulimit);
+ StringBuilder command = new StringBuilder();
+ for (String str : setup) {
+ command.append('\'');
+ command.append(str);
+ command.append('\'');
+ command.append(" ");
}
- return setup;
+ command.append("\n");
+ return command.toString();
}
/**
@@ -404,7 +410,7 @@ abstract class TaskRunner extends Thread
vargs.add(javaOptsSplit[i]);
}
- Path childTmpDir = createChildTmpDir(workDir, conf);
+ Path childTmpDir = createChildTmpDir(workDir, conf, false);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Add classpath.
@@ -453,7 +459,7 @@ abstract class TaskRunner extends Thread
* @throws IOException
*/
static Path createChildTmpDir(File workDir,
- JobConf conf)
+ JobConf conf, boolean createDir)
throws IOException {
// add java.io.tmpdir given by mapreduce.task.tmp.dir
@@ -463,10 +469,13 @@ abstract class TaskRunner extends Thread
// if temp directory path is not absolute, prepend it with workDir.
if (!tmpDir.isAbsolute()) {
tmpDir = new Path(workDir.toString(), tmp);
-
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && localFs.getFileStatus(tmpDir).isFile()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ if (createDir) {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.mkdirs(tmpDir) &&
+ !localFs.getFileStatus(tmpDir).isDir()) {
+ throw new IOException("Mkdirs failed to create " +
+ tmpDir.toString());
+ }
}
}
return tmpDir;
@@ -513,6 +522,7 @@ abstract class TaskRunner extends Thread
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ env.put(HADOOP_WORK_DIR, workDir.toString());
// put jobTokenFile name into env
String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
@@ -573,25 +583,6 @@ abstract class TaskRunner extends Thread
}
/**
- * Write the task specific job-configuration file.
- *
- * @param localFs
- * @throws IOException
- */
- private static void writeLocalTaskFile(String jobFile, JobConf conf)
- throws IOException {
- Path localTaskFile = new Path(jobFile);
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- /**
* Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
* Whenever it uses LocalDirAllocator from now on inside the child, it will
* only see files inside the attempt-directory. This is done in the Child's
@@ -615,15 +606,11 @@ abstract class TaskRunner extends Thread
}
/** Creates the working directory pathname for a task attempt. */
- static File formWorkDir(LocalDirAllocator lDirAlloc,
- TaskAttemptID task, boolean isCleanup, JobConf conf)
+ static Path formWorkDir(LocalDirAllocator lDirAlloc, JobConf conf)
throws IOException {
Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
- conf.getUser(), task.getJobID().toString(), task.toString(),
- isCleanup), conf);
-
- return new File(workDir.toString());
+ lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+ return workDir;
}
private static void appendSystemClasspaths(List<String> classPaths) {
@@ -715,7 +702,7 @@ abstract class TaskRunner extends Thread
}
}
- createChildTmpDir(workDir, conf);
+ createChildTmpDir(workDir, conf, true);
}
/**
@@ -739,8 +726,10 @@ abstract class TaskRunner extends Thread
/**
* Kill the child process
+ * @throws InterruptedException
+ * @throws IOException
*/
- public void kill() {
+ public void kill() throws IOException, InterruptedException {
killed = true;
jvmManager.taskKilled(this);
signalDone();
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 8 05:56:27 2011
@@ -65,31 +65,27 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.DeletionContext;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -98,24 +94,26 @@ import org.apache.hadoop.metrics2.lib.Me
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import static org.apache.hadoop.metrics2.impl.MsInfo.*;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.mapreduce.util.ConfigUtil;
-import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
/*******************************************************
* TaskTracker is a process that starts and tracks MR Tasks
@@ -164,14 +162,15 @@ public class TaskTracker
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
- // each job at job localization time. This will be used by TaskLogServlet for
- // authorizing viewing of task logs of that job
+ //Job ACLs file is created by TaskController under userlogs/$jobid directory
+ //for each job at job localization time. This will be used by TaskLogServlet
+ //for authorizing viewing of task logs of that job
static String jobACLsFile = "job-acls.xml";
volatile boolean running = true;
private LocalDirAllocator localDirAllocator;
+ private String[] localdirs;
String taskTrackerName;
String localHostname;
InetSocketAddress jobTrackAddr;
@@ -241,11 +240,12 @@ public class TaskTracker
static final String DISTCACHEDIR = "distcache";
static final String JOBCACHE = "jobcache";
static final String OUTPUT = "output";
- private static final String JARSDIR = "jars";
+ static final String JARSDIR = "jars";
static final String LOCAL_SPLIT_FILE = "split.dta";
static final String LOCAL_SPLIT_META_FILE = "split.info";
static final String JOBFILE = "job.xml";
static final String JOB_TOKEN_FILE="jobToken"; //localized file
+ static final String TT_PRIVATE_DIR = "ttprivate";
static final String JOB_LOCAL_DIR = MRJobConfig.JOB_LOCAL_DIR;
@@ -423,7 +423,6 @@ public class TaskTracker
RunningJob rJob = null;
if (!runningJobs.containsKey(jobId)) {
rJob = new RunningJob(jobId);
- rJob.localized = false;
rJob.tasks = new HashSet<TaskInProgress>();
runningJobs.put(jobId, rJob);
} else {
@@ -432,7 +431,6 @@ public class TaskTracker
synchronized (rJob) {
rJob.tasks.add(tip);
}
- runningJobs.notify(); //notify the fetcher thread
return rJob;
}
}
@@ -490,22 +488,32 @@ public class TaskTracker
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
}
+ static String getPrivateDirJobConfFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+ }
+
static String getLocalJobTokenFile(String user, String jobid) {
- return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR +
+ TaskTracker.JOB_TOKEN_FILE;
}
-
static String getTaskConfFile(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+ Path.SEPARATOR + TaskTracker.JOBFILE;
}
+ static String getPrivateDirTaskScriptLocation(String user, String jobid,
+ String taskid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalTaskDir(user, jobid, taskid);
+ }
+
static String getJobJarsDir(String user, String jobid) {
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
}
- static String getJobJarFile(String user, String jobid) {
+ public static String getJobJarFile(String user, String jobid) {
return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
}
@@ -529,7 +537,8 @@ public class TaskTracker
+ TaskTracker.OUTPUT;
}
- static String getLocalTaskDir(String user, String jobid, String taskid) {
+ public static String getLocalTaskDir(String user, String jobid,
+ String taskid) {
return getLocalTaskDir(user, jobid, taskid, false);
}
@@ -547,6 +556,19 @@ public class TaskTracker
String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
return dir + Path.SEPARATOR + MRConstants.WORKDIR;
}
+
+ static String getPrivateDirJobTokenFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalJobTokenFile(user, jobid);
+ }
+
+ static String getPrivateDirForJob(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+ }
+
+ String[] getLocalDirs() {
+ return localdirs;
+ }
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
@@ -565,7 +587,48 @@ public class TaskTracker
protocol);
}
}
-
+
+ /**
+ * Delete all of the user directories.
+ * @param conf the TT configuration
+ * @throws IOException
+ */
+ private void deleteUserDirectories(Configuration conf) throws IOException {
+ for(String root: localdirs) {
+ for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+ String owner = status.getOwner();
+ String path = status.getPath().getName();
+ if (path.equals(owner)) {
+ taskController.deleteAsUser(owner, "");
+ }
+ }
+ }
+ }
+
+ public void cleanupAllVolumes() throws IOException {
+ for (int v = 0; v < localdirs.length; v++) {
+ // List all files inside the volumes
+ FileStatus[] files = localFs.listStatus(new Path(localdirs[v]));
+ for (int f = 0; f < files.length; f++) {
+ if (files[f].getPath().getName().equals(SUBDIR)) {
+ FileStatus[] userDirs =
+ localFs.listStatus(new Path(localdirs[v] +Path.SEPARATOR+ SUBDIR));
+ for (int k = 0; k < userDirs.length; k++) {
+ // Get the relative file name to the root of the volume
+ String absoluteFilename = files[f].getPath().toUri().getPath();
+ getAsyncDiskService().moveAndDeleteRelativePath(localdirs[v],
+ absoluteFilename);
+ }
+ // // Do not delete the current TOBEDELETED
+ // if (!TOBEDELETED.equals(relative)) {
+ // moveAndDeleteRelativePath(volumes[v], relative);
+ // }
+ }
+ }
+ getAsyncDiskService().moveAndDeleteFromEachVolume(TT_PRIVATE_DIR);
+ }
+ }
+
/**
* Do the real constructor work here. It's in a separate method
* so we can call it again and "recycle" the object after calling
@@ -592,9 +655,18 @@ public class TaskTracker
// Check local disk, start async disk service, and clean up all
// local directories.
- checkLocalDirs(this.fConf.getLocalDirs());
- setAsyncDiskService(new MRAsyncDiskService(fConf));
- getAsyncDiskService().cleanupAllVolumes();
+ checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+ setAsyncDiskService(new MRAsyncDiskService(localFs,
+ taskController, fConf.getLocalDirs()));
+ cleanupAllVolumes();
+ final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+ for (String s : localdirs) {
+ localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+ }
+ final FsPermission priv = FsPermission.createImmutable((short) 0700);
+ for (String s : localdirs) {
+ localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+ }
// Clear out state tables
this.tasks.clear();
@@ -652,15 +724,6 @@ public class TaskTracker
this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
LOG.info("Starting tracker " + taskTrackerName);
- Class<? extends TaskController> taskControllerClass = fConf.getClass(
- TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class);
- taskController = (TaskController) ReflectionUtils.newInstance(
- taskControllerClass, fConf);
-
-
- // setup and create jobcache directory with appropriate permissions
- taskController.setup();
-
// Initialize DistributedCache
this.distributedCacheManager =
new TrackerDistributedCacheManager(this.fConf, taskController,
@@ -705,7 +768,7 @@ public class TaskTracker
reduceLauncher.start();
// create a localizer instance
- setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+ setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
//Start up node health checker service.
if (shouldStartHealthMonitor(this.fConf)) {
@@ -801,6 +864,9 @@ public class TaskTracker
List <FetchStatus> fList = new ArrayList<FetchStatus>();
for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
RunningJob rjob = item.getValue();
+ if (!rjob.localized) {
+ continue;
+ }
JobID jobId = item.getKey();
FetchStatus f;
synchronized (rjob) {
@@ -974,33 +1040,29 @@ public class TaskTracker
Task t = tip.getTask();
JobID jobId = t.getJobID();
RunningJob rjob = addTaskToJob(jobId, tip);
-
- // Initialize the user directories if needed.
- getLocalizer().initializeUserDirs(t.getUser());
+ InetSocketAddress ttAddr = getTaskTrackerReportAddress();
synchronized (rjob) {
if (!rjob.localized) {
-
- JobConf localJobConf = localizeJobFiles(t, rjob);
- // initialize job log directory
- initializeJobLogDir(jobId, localJobConf);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir. Note that initializeJob
- // should be the last call after every other directory/file to be
- // directly under the job directory is created.
- JobInitializationContext context = new JobInitializationContext();
- context.jobid = jobId;
- context.user = t.getUser();
- context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
- taskController.initializeJob(context);
-
+ Path localJobConfPath = initializeJob(t, rjob, ttAddr);
+ JobConf localJobConf = new JobConf(localJobConfPath);
+ // to be doubly sure, overwrite the user in the config with the one the
+ // TT thinks it is
+ localJobConf.setUser(t.getUser());
+ //also reset the #tasks per jvm
+ resetNumTasksPerJvm(localJobConf);
+ //set the base jobconf path in rjob; all tasks will use
+ //this as the base path when they run
+ rjob.localizedJobConf = localJobConfPath;
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
rjob.localized = true;
}
}
+ synchronized (runningJobs) {
+ runningJobs.notify(); //notify the fetcher thread
+ }
return rjob;
}
@@ -1019,31 +1081,32 @@ public class TaskTracker
* Localize the job on this tasktracker. Specifically
* <ul>
* <li>Cleanup and create job directories on all disks</li>
+ * <li>Download the credentials file</li>
* <li>Download the job config file job.xml from the FS</li>
- * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
- * in the configuration.
- * <li>Download the job jar file job.jar from the FS, unjar it and set jar
- * file in the configuration.</li>
+ * <li>Invokes the {@link TaskController} to do the rest of the job
+ * initialization</li>
* </ul>
- *
+ *
* @param t task whose job has to be localized on this TT
- * @return the modified job configuration to be used for all the tasks of this
- * job as a starting point.
+ * @param rjob the {@link RunningJob}
+ * @param ttAddr the tasktracker's RPC address
+ * @return the path to the job configuration to be used for all the tasks
+ * of this job as a starting point.
* @throws IOException
*/
- JobConf localizeJobFiles(Task t, RunningJob rjob)
- throws IOException, InterruptedException {
- JobID jobId = t.getJobID();
- String userName = t.getUser();
-
- // Initialize the job directories
- FileSystem localFs = FileSystem.getLocal(fConf);
- getLocalizer().initializeJobDirs(userName, jobId);
+ Path initializeJob(final Task t, final RunningJob rjob,
+ final InetSocketAddress ttAddr)
+ throws IOException, InterruptedException {
+ final JobID jobId = t.getJobID();
+
+ final Path jobFile = new Path(t.getJobFile());
+ final String userName = t.getUser();
+ final Configuration conf = getJobConf();
// save local copy of JobToken file
- String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+ final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
- Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+ Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
if (jt != null) { //could be null in the case of some unit tests
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -1051,38 +1114,87 @@ public class TaskTracker
for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
rjob.ugi.addToken(token);
}
+ FileSystem userFs = getFS(jobFile, jobId, conf);
// Download the job.xml for this job from the system FS
- Path localJobFile =
- localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
+ final Path localJobFile =
+ localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
- JobConf localJobConf = new JobConf(localJobFile);
- //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
- //AS PART OF THE TASK OBJECT
- localJobConf.setUser(userName);
-
- // set the location of the token file into jobConf to transfer
- // the name to TaskRunner
- localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
- localJobTokenFile.toString());
-
-
- // create the 'job-work' directory: job-specific shared directory for use as
- // scratch space by all tasks of the same job running on this TaskTracker.
- Path workDir =
- lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId
- .toString()), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
- localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
- // Download the job.jar for this job from the system FS
- localizeJobJarFile(userName, jobId, localFs, localJobConf);
-
- return localJobConf;
+ /**
+ * Now initialize the job via task-controller to do the rest of the
+ * job-init. Do this within a doAs since the public distributed cache
+ * is also set up here.
+ * To support potential authenticated HDFS accesses, we need the tokens
+ */
+ rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException, InterruptedException {
+ try {
+ final JobConf localJobConf = new JobConf(localJobFile);
+ // Setup the public distributed cache
+ TaskDistributedCacheManager taskDistributedCacheManager =
+ getTrackerDistributedCacheManager()
+ .newTaskDistributedCacheManager(jobId, localJobConf);
+ rjob.distCacheMgr = taskDistributedCacheManager;
+ taskDistributedCacheManager.setupCache(localJobConf,
+ TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+
+ // Set some config values
+ localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ if (conf.get("slave.host.name") != null) {
+ localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+ }
+ resetNumTasksPerJvm(localJobConf);
+ localJobConf.setUser(t.getUser());
+
+ // write back the config (this config will have the updates that the
+ // distributed cache manager makes as well)
+ JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+ taskController.initializeJob(t.getUser(), jobId.toString(),
+ new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+ ttAddr);
+ } catch (IOException e) {
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(e));
+ throw e;
+ } catch (InterruptedException ie) {
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(ie));
+ throw ie;
+ }
+ return null;
+ }
+ });
+ //search for the conf that the initializeJob created
+ //need to look up certain configs from this conf, like
+ //the distributed cache, profiling, etc. ones
+ Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+ userName, jobId.toString()), getJobConf());
+ return initializedConf;
+ }
+
+ /** If certain configs are enabled, then jvm-reuse should be disabled
+ * @param localJobConf
+ */
+ static void resetNumTasksPerJvm(JobConf localJobConf) {
+ boolean debugEnabled = false;
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ return;
+ }
+ if (localJobConf.getMapDebugScript() != null ||
+ localJobConf.getReduceDebugScript() != null) {
+ debugEnabled = true;
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+
+ if (debugEnabled || localJobConf.getProfileEnabled() ||
+ keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+ //disable jvm reuse
+ localJobConf.setNumTasksToExecutePerJvm(1);
+ }
}
+
// Create job userlog dir.
// Create job acls file in job log dir, if needed.
void initializeJobLogDir(JobID jobId, JobConf localJobConf)
@@ -1146,15 +1258,16 @@ public class TaskTracker
/**
* Download the job configuration file from the FS.
*
- * @param t Task whose job file has to be downloaded
- * @param jobId jobid of the task
+ * @param jobFile the original location of the configuration file
+ * @param user the user in question
+ * @param userFs the FileSystem created on behalf of the user
+ * @param jobId jobid in question
* @return the local file system path of the downloaded file.
* @throws IOException
*/
- private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
- throws IOException, InterruptedException {
+ private Path localizeJobConfFile(Path jobFile, String user,
+ FileSystem userFs, JobID jobId) throws IOException {
final JobConf conf = new JobConf(getJobConf());
- FileSystem userFs = getFS(jobFile, jobId, conf);
// Get sizes of JobFile
// sizes are -1 if they are not present.
FileStatus status = null;
@@ -1166,69 +1279,25 @@ public class TaskTracker
jobFileSize = -1;
}
- Path localJobFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()),
- jobFileSize, fConf);
+ Path localJobFile = lDirAlloc.getLocalPathForWrite(
+ getPrivateDirJobConfFile(user, jobId.toString()), jobFileSize, fConf);
// Download job.xml
userFs.copyToLocalFile(jobFile, localJobFile);
return localJobFile;
}
- /**
- * Download the job jar file from FS to the local file system and unjar it.
- * Set the local jar file in the passed configuration.
- *
- * @param jobId
- * @param localFs
- * @param localJobConf
- * @throws IOException
- */
- private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs,
- JobConf localJobConf)
- throws IOException, InterruptedException {
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- FileStatus status = null;
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- FileSystem fs = getFS(jarFilePath, jobId, localJobConf);
- try {
- status = fs.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch (FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for five times the size of jarFileSize to accommodate for
- // unjarring the jar file in the jars directory
- Path localJarFile =
- lDirAlloc.getLocalPathForWrite(
- getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
-
- // Download job.jar
- fs.copyToLocalFile(jarFilePath, localJarFile);
-
- localJobConf.setJar(localJarFile.toString());
-
- // Un-jar the parts of the job.jar that need to be added to the classpath
- RunJar.unJar(
- new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()),
- localJobConf.getJarUnpackPattern());
- }
- }
-
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
- UserGroupInformation ugi) throws IOException {
+ RunningJob rjob) throws IOException {
synchronized (tip) {
tip.setJobConf(jobConf);
- tip.setUGI(ugi);
- tip.launchTask();
+ tip.setUGI(rjob.ugi);
+ tip.launchTask(rjob);
}
}
- public synchronized void shutdown() throws IOException {
+ public synchronized void shutdown()
+ throws IOException, InterruptedException {
shuttingDown = true;
close();
if (this.server != null) {
@@ -1245,8 +1314,9 @@ public class TaskTracker
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
+ * @throws InterruptedException
*/
- public synchronized void close() throws IOException {
+ public synchronized void close() throws IOException, InterruptedException {
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1353,8 +1423,14 @@ public class TaskTracker
server.start();
this.httpPort = server.getPort();
checkJettyPort(httpPort);
+ Class<? extends TaskController> taskControllerClass =
+ conf.getClass("mapred.task.tracker.task-controller",
+ DefaultTaskController.class, TaskController.class);
+ taskController =
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+ taskController.setup(localDirAllocator);
// create task log cleanup thread
- setTaskLogCleanupThread(new UserLogCleaner(fConf));
+ setTaskLogCleanupThread(new UserLogCleaner(fConf, taskController));
UserGroupInformation.setConfiguration(fConf);
SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
@@ -1374,7 +1450,7 @@ public class TaskTracker
private void startCleanupThreads() throws IOException {
taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
- directoryCleanupThread = new CleanupQueue();
+ directoryCleanupThread = CleanupQueue.getInstance();
// start tasklog cleanup thread
taskLogCleanupThread.setDaemon(true);
taskLogCleanupThread.start();
@@ -1855,7 +1931,6 @@ public class TaskTracker
ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
tip.reportDiagnosticInfo(msg);
myInstrumentation.timedoutTask(tip.getTask().getTaskID());
- dumpTaskStack(tip);
purgeTask(tip, true);
}
}
@@ -1863,86 +1938,6 @@ public class TaskTracker
}
/**
- * Builds list of PathDeletionContext objects for the given paths
- */
- private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
- Path[] paths) {
- int i = 0;
- PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
- }
- return contexts;
- }
-
- /**
- * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a
- * job each pointing to the job's jobLocalDir.
- * @param fs : FileSystem in which the dirs to be deleted
- * @param paths : mapred-local-dirs
- * @param id : {@link JobID} of the job for which the local-dir needs to
- * be cleaned up.
- * @param user : Job owner's username
- * @param taskController : the task-controller to be used for deletion of
- * jobLocalDir
- */
- static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
- FileSystem fs, Path[] paths, JobID id, String user,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
- taskController);
- }
- return contexts;
- }
-
- /**
- * Builds list of TaskControllerTaskPathDeletionContext objects for a task
- * @param fs : FileSystem in which the dirs to be deleted
- * @param paths : mapred-local-dirs
- * @param task : the task whose taskDir or taskWorkDir is going to be deleted
- * @param isWorkDir : the dir to be deleted is workDir or taskDir
- * @param taskController : the task-controller to be used for deletion of
- * taskDir or taskWorkDir
- */
- static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
- FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
- isWorkDir, taskController);
- }
- return contexts;
- }
-
- /**
- * Send a signal to a stuck task commanding it to dump stack traces
- * to stderr before we kill it with purgeTask().
- *
- * @param tip {@link TaskInProgress} to dump stack traces.
- */
- private void dumpTaskStack(TaskInProgress tip) {
- TaskRunner runner = tip.getTaskRunner();
- if (null == runner) {
- return; // tip is already abandoned.
- }
-
- JvmManager jvmMgr = runner.getJvmManager();
- jvmMgr.dumpStack(runner);
- }
-
- /**
* The task tracker is done with this job, so we need to clean up.
* @param action The action with the job
* @throws IOException
@@ -1958,7 +1953,12 @@ public class TaskTracker
if (rjob == null) {
LOG.warn("Unknown job " + jobId + " being deleted.");
} else {
- synchronized (rjob) {
+ synchronized (rjob) {
+ // decrement the reference counts for the items this job references
+ rjob.distCacheMgr.release();
+ // inform the cache manager that the job is done
+ getTrackerDistributedCacheManager()
+ .deleteTaskDistributedCacheManager(jobId);
// Add this tips of this job to queue of tasks to be purged
for (TaskInProgress tip : rjob.tasks) {
tip.jobHasFinished(false);
@@ -1970,7 +1970,7 @@ public class TaskTracker
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles) {
- removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
+ removeJobFiles(rjob.ugi.getShortUserName(), rjob.getJobID());
}
// add job to taskLogCleanupThread
long now = System.currentTimeMillis();
@@ -1998,15 +1998,25 @@ public class TaskTracker
/**
* This job's files are no longer needed on this TT, remove them.
*
- * @param rjob
+ * @param user User who ran the job
+ * @param jobId Remove local work dirs for this job
* @throws IOException
*/
- void removeJobFiles(String user, JobID jobId)
- throws IOException {
- PathDeletionContext[] contexts =
- buildTaskControllerJobPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), jobId, user, taskController);
- directoryCleanupThread.addToQueue(contexts);
+ void removeJobFiles(String user, JobID jobId) throws IOException {
+ String jobDir = getLocalJobDir(user, jobId.toString());
+ PathDeletionContext jobCleanup =
+ new TaskController.DeletionContext(getTaskController(), false, user,
+ jobDir,
+ localdirs);
+ directoryCleanupThread.addToQueue(jobCleanup);
+
+ for (String str : localdirs) {
+ Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+ new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+ PathDeletionContext ttPrivateJobCleanup =
+ new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+ directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+ }
}
/**
@@ -2311,12 +2321,14 @@ public class TaskTracker
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
+ * @throws InterruptedException
*/
- void startNewTask(TaskInProgress tip) {
+ void startNewTask(TaskInProgress tip) throws InterruptedException {
try {
RunningJob rjob = localizeJob(tip);
+ tip.getTask().setJobFile(rjob.localizedJobConf.toString());
// Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
- launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi);
+ launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob);
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -2326,8 +2338,9 @@ public class TaskTracker
tip.kill(true);
tip.cleanup(true);
} catch (IOException ie2) {
- LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
- StringUtils.stringifyException(ie2));
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ } catch (InterruptedException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
}
// Careful!
@@ -2448,7 +2461,7 @@ public class TaskTracker
private TaskRunner runner;
volatile boolean done = false;
volatile boolean wasKilled = false;
- private JobConf defaultJobConf;
+ private JobConf ttConf;
private JobConf localJobConf;
private boolean keepFailedTaskFiles;
private boolean alwaysKeepTaskFiles;
@@ -2480,7 +2493,7 @@ public class TaskTracker
this.task = task;
this.launcher = launcher;
this.lastProgressReport = System.currentTimeMillis();
- this.defaultJobConf = conf;
+ this.ttConf = conf;
localJobConf = null;
if (task.isUberTask()) {
taskStatus = new UberTaskStatus(
@@ -2504,66 +2517,9 @@ public class TaskTracker
}
void localizeTask(Task task) throws IOException{
-
- FileSystem localFs = FileSystem.getLocal(fConf);
-
- // create taskDirs on all the disks.
- getLocalizer().initializeAttemptDirs(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask());
-
- // create the working-directory of the task
- Path cwd =
- lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), defaultJobConf);
- if (!localFs.mkdirs(cwd)) {
- throw new IOException("Mkdirs failed to create "
- + cwd.toString());
- }
-
- localJobConf.set(LOCAL_DIR,
- fConf.get(LOCAL_DIR));
-
- if (fConf.get(TT_HOST_NAME) != null) {
- localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME));
- }
-
- keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-
// Do the task-type specific localization
+ //TODO: are these calls really required
task.localizeConfiguration(localJobConf);
-
- List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
- if (staticResolutions != null && staticResolutions.size() > 0) {
- StringBuffer str = new StringBuffer();
-
- for (int i = 0; i < staticResolutions.size(); i++) {
- String[] hostToResolved = staticResolutions.get(i);
- str.append(hostToResolved[0]+"="+hostToResolved[1]);
- if (i != staticResolutions.size() - 1) {
- str.append(',');
- }
- }
- localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString());
- }
- if (task.isMapTask()) {
- debugCommand = localJobConf.getMapDebugScript();
- } else {
- debugCommand = localJobConf.getReduceDebugScript();
- }
- String keepPattern = localJobConf.getKeepTaskFilesPattern();
- if (keepPattern != null) {
- alwaysKeepTaskFiles =
- Pattern.matches(keepPattern, task.getTaskID().toString());
- } else {
- alwaysKeepTaskFiles = false;
- }
- if (debugCommand != null || localJobConf.getProfileEnabled() ||
- alwaysKeepTaskFiles || keepFailedTaskFiles) {
- //disable jvm reuse
- localJobConf.setNumTasksToExecutePerJvm(1);
- }
task.setConf(localJobConf);
}
@@ -2586,6 +2542,18 @@ public class TaskTracker
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
taskTimeout = localJobConf.getLong(MRJobConfig.TASK_TIMEOUT,
10 * 60 * 1000);
+ if (task.isMapTask()) {
+ debugCommand = localJobConf.getMapDebugScript();
+ } else {
+ debugCommand = localJobConf.getReduceDebugScript();
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+ if (keepPattern != null) {
+ alwaysKeepTaskFiles =
+ Pattern.matches(keepPattern, task.getTaskID().toString());
+ } else {
+ alwaysKeepTaskFiles = false;
+ }
}
public synchronized JobConf getJobConf() {
@@ -2606,7 +2574,7 @@ public class TaskTracker
/**
* Kick off the task execution
*/
- public synchronized void launchTask() throws IOException {
+ public synchronized void launchTask(RunningJob rjob) throws IOException {
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
@@ -2614,7 +2582,7 @@ public class TaskTracker
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
- setTaskRunner(task.createRunner(TaskTracker.this, this));
+ setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
@@ -2902,13 +2870,14 @@ public class TaskTracker
vargs.add(taskSyslog);
vargs.add(jobConf);
vargs.add(program);
- DebugScriptContext context =
- new TaskController.DebugScriptContext();
- context.args = vargs;
- context.stdout = stdout;
- context.workDir = workDir;
- context.task = task;
- getTaskController().runDebugScript(context);
+ // TODO need to fix debug script
+ //DebugScriptContext context =
+ // new TaskController.DebugScriptContext();
+ //context.args = vargs;
+ //context.stdout = stdout;
+ //context.workDir = workDir;
+ //context.task = task;
+ //getTaskController().runDebugScript(context);
// add the lines of debug out to diagnostics
int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, -1);
addDiagnostics(FileUtil.makeShellPath(stdout), num, "DEBUG OUT");
@@ -2989,7 +2958,12 @@ public class TaskTracker
getRunState() == TaskStatus.State.UNASSIGNED ||
getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
- kill(wasFailure);
+ try {
+ kill(wasFailure);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while killing " +
+ getTask().getTaskID(), e);
+ }
}
}
@@ -3001,8 +2975,10 @@ public class TaskTracker
* Something went wrong and the task must be killed.
*
* @param wasFailure was it a failure (versus a kill request)?
+ * @throws InterruptedException
*/
- public synchronized void kill(boolean wasFailure) throws IOException {
+ public synchronized void kill(boolean wasFailure
+ ) throws IOException, InterruptedException {
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
@@ -3106,7 +3082,7 @@ public class TaskTracker
return;
}
try {
- removeTaskFiles(needCleanup, taskId);
+ removeTaskFiles(needCleanup);
} catch (Throwable ie) {
LOG.info("Error cleaning up task runner: "
+ StringUtils.stringifyException(ie));
@@ -3118,47 +3094,27 @@ public class TaskTracker
* Some or all of the files from this task are no longer required. Remove
* them via CleanupQueue.
*
- * @param needCleanup
+ * @param removeOutputs remove outputs as well as output
* @param taskId
* @throws IOException
*/
- void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
- throws IOException {
- if (needCleanup) {
- if (runner != null) {
- // cleans up the output directory of the task (where map outputs
- // and reduce inputs get stored)
- runner.close();
- }
-
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- // No jvm reuse, remove everything
- PathDeletionContext[] contexts =
- buildTaskControllerTaskPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), task, false/* not workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ void removeTaskFiles(boolean removeOutputs) throws IOException {
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ String user = ugi.getShortUserName();
+ int userDirLen = TaskTracker.getUserDir(user).length();
+ String jobId = task.getJobID().toString();
+ String taskId = task.getTaskID().toString();
+ boolean cleanup = task.isTaskCleanupTask();
+ String taskDir;
+ if (!removeOutputs) {
+ taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup);
} else {
- // Jvm reuse. We don't delete the workdir since some other task
- // (running in the same JVM) might be using the dir. The JVM
- // running the tasks would clean the workdir per a task in the
- // task process itself.
- String localTaskDir =
- getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
- .toString(), task.isTaskCleanupTask());
- PathDeletionContext[] contexts = buildPathDeletionContexts(
- localFs, getLocalFiles(defaultJobConf, localTaskDir +
- Path.SEPARATOR + TaskTracker.JOBFILE));
- directoryCleanupThread.addToQueue(contexts);
- }
- } else {
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- PathDeletionContext[] contexts =
- buildTaskControllerTaskPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), task, true /* workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup);
}
+ PathDeletionContext item =
+ new TaskController.DeletionContext(taskController, false, user,
+ taskDir, localdirs);
+ directoryCleanupThread.addToQueue(item);
}
}
@@ -3199,7 +3155,11 @@ public class TaskTracker
if (rjob == null) { //kill the JVM since the job is dead
LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
" is dead");
- jvmManager.killJvm(jvmId);
+ try {
+ jvmManager.killJvm(jvmId);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to kill " + jvmId, e);
+ }
return new JvmTask(null, true);
}
TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
@@ -3393,12 +3353,15 @@ public class TaskTracker
static class RunningJob{
private JobID jobid;
private JobConf jobConf;
+ private Path localizedJobConf;
// keep this for later use
volatile Set<TaskInProgress> tasks;
- boolean localized;
+ volatile boolean localized;
boolean keepJobFiles;
UserGroupInformation ugi;
FetchStatus f;
+ TaskDistributedCacheManager distCacheMgr;
+
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -4055,46 +4018,54 @@ public class TaskTracker
return distributedCacheManager;
}
- /**
- * Download the job-token file from the FS and save on local fs.
- * @param user
- * @param jobId
- * @param jobConf
- * @return the local file system path of the downloaded file.
- * @throws IOException
- */
- private String localizeJobTokenFile(String user, JobID jobId)
- throws IOException {
- // check if the tokenJob file is there..
- Path skPath = new Path(systemDirectory,
- jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
-
- FileStatus status = null;
- long jobTokenSize = -1;
- status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
- jobTokenSize = status.getLen();
-
- Path localJobTokenFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
- jobId.toString()), jobTokenSize, fConf);
- String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
- LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
- " to " + localJobTokenFileStr);
-
- // Download job_token
- systemFS.copyToLocalFile(skPath, localJobTokenFile);
- return localJobTokenFileStr;
- }
-
- JobACLsManager getJobACLsManager() {
- return aclsManager.getJobACLsManager();
- }
+ /**
+ * Download the job-token file from the FS and save on local fs.
+ * @param user
+ * @param jobId
+ * @param jobConf
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private String localizeJobTokenFile(String user, JobID jobId)
+ throws IOException {
+ // check if the tokenJob file is there..
+ Path skPath = new Path(systemDirectory,
+ jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
- ACLsManager getACLsManager() {
- return aclsManager;
- }
+ FileStatus status = null;
+ long jobTokenSize = -1;
+ status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+ jobTokenSize = status.getLen();
+
+ Path localJobTokenFile =
+ lDirAlloc.getLocalPathForWrite(getPrivateDirJobTokenFile(user,
+ jobId.toString()), jobTokenSize, fConf);
+ String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
+ " to " + localJobTokenFileStr);
+
+ // Download job_token
+ systemFS.copyToLocalFile(skPath, localJobTokenFile);
+ return localJobTokenFileStr;
+ }
+
+ JobACLsManager getJobACLsManager() {
+ return aclsManager.getJobACLsManager();
+ }
+
+ ACLsManager getACLsManager() {
+ return aclsManager;
+ }
+
+ synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
+ return runningTasks.get(tid);
+ }
+
+ @Override
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId, long[] sizes)
+ throws IOException {
+ distributedCacheManager.setArchiveSizes(jobId, sizes);
+ }
- synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
- return runningTasks.get(tid);
- }
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Mar 8 05:56:27 2011
@@ -161,4 +161,13 @@ public interface TaskUmbilicalProtocol e
TaskAttemptID id)
throws IOException;
+ /**
+ * The job initializer needs to report the sizes of the archive
+ * objects in the private distributed cache.
+ * @param jobId the job to update
+ * @param sizes the array of sizes that were computed
+ * @throws IOException
+ */
+ void updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes) throws IOException;
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar 8 05:56:27 2011
@@ -80,9 +80,9 @@ class UberTask extends Task {
}
@Override
- public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
- throws IOException {
- return new UberTaskRunner(tip, tracker, conf);
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+ TaskTracker.RunningJob rjob) throws IOException {
+ return new UberTaskRunner(tip, tracker, conf, rjob);
}
/* perhaps someday we'll allow an UberTask to run as either a MapTask or a
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java Tue Mar 8 05:56:27 2011
@@ -25,8 +25,9 @@ import org.apache.log4j.Level;
public class UberTaskRunner extends TaskRunner {
- public UberTaskRunner(TaskInProgress tip, TaskTracker tracker, JobConf conf) {
- super(tip, tracker, conf);
+ public UberTaskRunner(TaskInProgress tip, TaskTracker tracker, JobConf conf,
+ TaskTracker.RunningJob rjob) throws IOException {
+ super(tip, tracker, conf, rjob);
}
@Override
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java Tue Mar 8 05:56:27 2011
@@ -32,6 +32,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
@@ -53,15 +56,19 @@ class UserLogCleaner extends Thread {
private Map<JobID, Long> completedJobs = Collections
.synchronizedMap(new HashMap<JobID, Long>());
private final long threadSleepTime;
- private MRAsyncDiskService logAsyncDisk;
private Clock clock;
+ private TaskController taskController;
+ private CleanupQueue cleanupQueue;
+ private FileSystem localFs;
- UserLogCleaner(Configuration conf) throws IOException {
+ UserLogCleaner(Configuration conf, TaskController taskController)
+ throws IOException {
threadSleepTime = conf.getLong(TTConfig.TT_USERLOGCLEANUP_SLEEPTIME,
DEFAULT_THREAD_SLEEP_TIME);
- logAsyncDisk = new MRAsyncDiskService(FileSystem.getLocal(conf), TaskLog
- .getUserLogDir().toString());
setClock(new Clock());
+ localFs = FileSystem.getLocal(conf);
+ this.taskController = taskController;
+ cleanupQueue = CleanupQueue.getInstance();
}
void setClock(Clock clock) {
@@ -100,7 +107,7 @@ class UserLogCleaner extends Thread {
// see if the job is old enough
if (entry.getValue().longValue() <= now) {
// add the job logs directory to for delete
- deleteLogPath(TaskLog.getJobDir(entry.getKey()).getAbsolutePath());
+ deleteLogPath(entry.getKey().toString());
completedJobIter.remove();
}
}
@@ -125,16 +132,12 @@ class UserLogCleaner extends Thread {
// add all the log dirs to taskLogsMnonitor.
long now = clock.getTime();
for (String logDir : logDirs) {
- if (logDir.equals(logAsyncDisk.TOBEDELETED)) {
- // skip this
- continue;
- }
JobID jobid = null;
try {
jobid = JobID.forName(logDir);
} catch (IllegalArgumentException ie) {
// if the directory is not a jobid, delete it immediately
- deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+ deleteLogPath(logDir);
continue;
}
// add the job log directory with default retain hours, if it is not
@@ -197,6 +200,11 @@ class UserLogCleaner extends Thread {
*/
private void deleteLogPath(String logPath) throws IOException {
LOG.info("Deleting user log path " + logPath);
- logAsyncDisk.moveAndDeleteAbsolutePath(logPath);
+ String logRoot = TaskLog.getUserLogDir().toString();
+ String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+ PathDeletionContext item =
+ new TaskController.DeletionContext(taskController, true, user, logPath,
+ null);
+ cleanupQueue.addToQueue(item);
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Mar 8 05:56:27 2011
@@ -265,7 +265,7 @@ public interface JobContext extends MRJo
* @return a string array of timestamps
* @throws IOException
*/
- public String[] getArchiveTimestamps();
+ public long[] getArchiveTimestamps();
/**
* Get the timestamps of the files. Used by internal
@@ -273,7 +273,7 @@ public interface JobContext extends MRJo
* @return a string array of timestamps
* @throws IOException
*/
- public String[] getFileTimestamps();
+ public long[] getFileTimestamps();
/**
* Get the configured number of maximum attempts that will be made to run a
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Tue Mar 8 05:56:27 2011
@@ -118,4 +118,4 @@ public class JobSubmissionFiles {
return stagingArea;
}
-}
\ No newline at end of file
+}