You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [23/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java Sat Nov 28 20:26:01 2009
@@ -17,10 +17,10 @@
*/
package org.apache.hadoop.mapred;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
@@ -64,12 +65,19 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -82,8 +90,9 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.Service;
@@ -100,7 +109,7 @@
*
*******************************************************/
public class TaskTracker extends Service
- implements MRConstants, TaskUmbilicalProtocol, Runnable {
+ implements MRConstants, TaskUmbilicalProtocol, Runnable, TTConfig {
/**
* @deprecated
*/
@@ -120,21 +129,20 @@
static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
static{
- Configuration.addDefaultResource("mapred-default.xml");
- Configuration.addDefaultResource("mapred-site.xml");
+ ConfigUtil.loadResources();
}
public static final Log LOG =
LogFactory.getLog(TaskTracker.class);
public static final String MR_CLIENTTRACE_FORMAT =
- "src: %s" + // src IP
- ", dest: %s" + // dst IP
- ", bytes: %s" + // byte count
- ", op: %s" + // operation
- ", cliID: %s" + // task id
- ", reduceID: %s" + // reduce id
- ", duration: %s"; // duration
+ "src: %s" + // src IP
+ ", dest: %s" + // dst IP
+ ", maps: %s" + // number of maps
+ ", op: %s" + // operation
+ ", reduceID: %s" + // reduce id
+ ", duration: %s"; // duration
+
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
@@ -152,7 +160,7 @@
Server taskReportServer = null;
InterTrackerProtocol jobClient;
-
+
private TrackerDistributedCacheManager distributedCacheManager;
// last heartbeat response recieved
@@ -185,7 +193,8 @@
* Map from taskId -> TaskInProgress.
*/
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
- Map<JobID, RunningJob> runningJobs = null;
+ Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+
volatile int mapTotal = 0;
volatile int reduceTotal = 0;
boolean justStarted = true;
@@ -206,24 +215,36 @@
//for serving map output to the other nodes
static Random r = new Random();
- static final String SUBDIR = "taskTracker";
- private static final String DISTCACHEDIR = "distcache";
+ public static final String SUBDIR = "taskTracker";
+ static final String DISTCACHEDIR = "distcache";
static final String JOBCACHE = "jobcache";
static final String OUTPUT = "output";
private static final String JARSDIR = "jars";
static final String LOCAL_SPLIT_FILE = "split.dta";
static final String JOBFILE = "job.xml";
+ static final String JOB_TOKEN_FILE="jobToken"; //localized file
- static final String JOB_LOCAL_DIR = "job.local.dir";
+ static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR;
private JobConf fConf;
- private FileSystem localFs;
+ FileSystem localFs;
+
+ private Localizer localizer;
+
private int maxMapSlots;
private int maxReduceSlots;
private int failures;
+
+ // Performance-related config knob to send an out-of-band heartbeat
+ // on task completion
+ private volatile boolean oobHeartbeatOnTaskCompletion;
+
+ // Track number of completed tasks to send an out-of-band heartbeat
+ private IntWritable finishedCount = new IntWritable(0);
+
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
- private CleanupQueue directoryCleanupThread;
+ CleanupQueue directoryCleanupThread;
volatile JvmManager jvmManager;
private TaskMemoryManagerThread taskMemoryManager;
@@ -235,7 +256,7 @@
private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
- "mapred.tasktracker.memory_calculator_plugin";
+ TT_MEMORY_CALCULATOR_PLUGIN;
/**
* the minimum interval between jobtracker polls
@@ -340,7 +361,7 @@
*/
private TaskCleanupThread taskCleanupThread;
- TaskController getTaskController() {
+ public TaskController getTaskController() {
return taskController;
}
@@ -377,68 +398,84 @@
}
}
- static String getDistributedCacheDir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+ Localizer getLocalizer() {
+ return localizer;
+ }
+
+ void setLocalizer(Localizer l) {
+ localizer = l;
+ }
+
+ public static String getUserDir(String user) {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + user;
+ }
+
+ public static String getDistributedCacheDir(String user) {
+ return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
}
- static String getJobCacheSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+ public static String getJobCacheSubdir(String user) {
+ return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
- static String getLocalJobDir(String jobid) {
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ public static String getLocalJobDir(String user, String jobid) {
+ return getJobCacheSubdir(user) + Path.SEPARATOR + jobid;
}
- static String getLocalJobConfFile(String jobid) {
- return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+ static String getLocalJobConfFile(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
}
+
+ static String getLocalJobTokenFile(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+ }
+
- static String getTaskConfFile(String jobid, String taskid,
+ static String getTaskConfFile(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
- return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
- + TaskTracker.JOBFILE;
+ return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+ + Path.SEPARATOR + TaskTracker.JOBFILE;
}
- static String getJobJarsDir(String jobid) {
- return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+ static String getJobJarsDir(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
}
- static String getJobJarFile(String jobid) {
- return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+ static String getJobJarFile(String user, String jobid) {
+ return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
}
- static String getJobWorkDir(String jobid) {
- return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+ static String getJobWorkDir(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
}
- static String getLocalSplitFile(String jobid, String taskid) {
- return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ static String getLocalSplitFile(String user, String jobid, String taskid) {
+ return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+ TaskTracker.LOCAL_SPLIT_FILE;
}
- static String getIntermediateOutputDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ static String getIntermediateOutputDir(String user, String jobid,
+ String taskid) {
+ return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+ TaskTracker.OUTPUT;
}
- static String getLocalTaskDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid, false);
+ static String getLocalTaskDir(String user, String jobid, String taskid) {
+ return getLocalTaskDir(user, jobid, taskid, false);
}
- static String getLocalTaskDir(String jobid,
- String taskid,
- boolean isCleanupAttempt) {
- String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
- if (isCleanupAttempt) {
+ public static String getLocalTaskDir(String user, String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
taskDir = taskDir + TASK_CLEANUP_SUFFIX;
- }
- return taskDir;
+ }
+ return taskDir;
}
- static String getTaskWorkDir(String jobid, String taskid,
+ static String getTaskWorkDir(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
- String dir =
- getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
if (isCleanupAttempt) {
dir = dir + TASK_CLEANUP_SUFFIX;
}
@@ -482,14 +519,14 @@
localFs = FileSystem.getLocal(fConf);
// use configured nameserver & interface to get local hostname
- if (fConf.get("slave.host.name") != null) {
- this.localHostname = fConf.get("slave.host.name");
+ if (fConf.get(TT_HOST_NAME) != null) {
+ this.localHostname = fConf.get(TT_HOST_NAME);
}
if (localHostname == null) {
this.localHostname =
DNS.getDefaultHost
- (fConf.get("mapred.tasktracker.dns.interface","default"),
- fConf.get("mapred.tasktracker.dns.nameserver","default"));
+ (fConf.get(TT_DNS_INTERFACE,"default"),
+ fConf.get(TT_DNS_NAMESERVER,"default"));
}
//check local disk
@@ -505,10 +542,11 @@
this.acceptNewTasks = true;
this.status = null;
- this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
- this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+ this.minSpaceStart = this.fConf.getLong(TT_LOCAL_DIR_MINSPACE_START, 0L);
+ this.minSpaceKill = this.fConf.getLong(TT_LOCAL_DIR_MINSPACE_KILL, 0L);
//tweak the probe sample size (make it a function of numCopiers)
- probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
+ probe_sample_size =
+ this.fConf.getInt(TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL, 500);
Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
try {
@@ -524,7 +562,7 @@
// bind address
InetSocketAddress socAddr = NetUtils.createSocketAddr(
- fConf.get("mapred.task.tracker.report.address", "127.0.0.1:0"));
+ fConf.get(TT_REPORT_ADDRESS, "127.0.0.1:0"));
String bindAddress = socAddr.getHostName();
int tmpPort = socAddr.getPort();
@@ -552,7 +590,7 @@
// get the assigned address
this.taskReportAddress = taskReportServer.getListenerAddress();
- this.fConf.set("mapred.task.tracker.report.address",
+ this.fConf.set(TT_REPORT_ADDRESS,
taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
LOG.info("TaskTracker up at: " + this.taskReportAddress);
@@ -596,7 +634,7 @@
mapLauncher.start();
reduceLauncher.start();
Class<? extends TaskController> taskControllerClass
- = fConf.getClass("mapred.task.tracker.task-controller",
+ = fConf.getClass(TT_TASK_CONTROLLER,
DefaultTaskController.class,
TaskController.class);
taskController = (TaskController)ReflectionUtils.newInstance(
@@ -604,22 +642,28 @@
//setup and create jobcache directory with appropriate permissions
taskController.setup();
-
+
+ // create a localizer instance
+ setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+
//Start up node health checker service.
if (shouldStartHealthMonitor(this.fConf)) {
startHealthMonitor(this.fConf);
}
+
+ oobHeartbeatOnTaskCompletion =
+ fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
}
public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
Configuration conf) {
- return conf.getClass("mapred.tasktracker.instrumentation",
+ return conf.getClass(TT_INSTRUMENTATION,
TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
}
public static void setInstrumentationClass(
Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
- conf.setClass("mapred.tasktracker.instrumentation",
+ conf.setClass(TT_INSTRUMENTATION,
t, TaskTrackerInstrumentation.class);
}
@@ -807,7 +851,7 @@
}
private static LocalDirAllocator lDirAlloc =
- new LocalDirAllocator("mapred.local.dir");
+ new LocalDirAllocator(MRConfig.LOCAL_DIR);
// intialize the job directory
private void localizeJob(TaskInProgress tip) throws IOException {
@@ -815,6 +859,9 @@
JobID jobId = t.getJobID();
RunningJob rjob = addTaskToJob(jobId, tip);
+ // Initialize the user directories if needed.
+ getLocalizer().initializeUserDirs(t.getUser());
+
synchronized (rjob) {
if (!rjob.localized) {
@@ -833,6 +880,12 @@
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
+ FSDataInputStream in = localFs.open(new Path(
+ rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
+ JobTokens jt = new JobTokens();
+ jt.readFields(in);
+ rjob.jobTokens = jt; // store JobToken object per job
+
rjob.localized = true;
}
}
@@ -858,21 +911,23 @@
JobConf localizeJobFiles(Task t)
throws IOException {
JobID jobId = t.getJobID();
+ String userName = t.getUser();
- // Initialize the job directories first
+ // Initialize the job directories
FileSystem localFs = FileSystem.getLocal(fConf);
- initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+ getLocalizer().initializeJobDirs(userName, jobId);
// Download the job.xml for this job from the system FS
- Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId);
+ Path localJobFile =
+ localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
JobConf localJobConf = new JobConf(localJobFile);
// create the 'job-work' directory: job-specific shared directory for use as
// scratch space by all tasks of the same job running on this TaskTracker.
Path workDir =
- lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
- fConf);
+ lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId
+ .toString()), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
@@ -881,153 +936,12 @@
localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
// Download the job.jar for this job from the system FS
- localizeJobJarFile(jobId, localFs, localJobConf);
+ localizeJobJarFile(userName, jobId, localFs, localJobConf);
+ // save local copy of JobToken file
+ localizeJobTokenFile(userName, jobId, localJobConf);
return localJobConf;
}
- static class PermissionsHandler {
- /**
- * Permission information useful for setting permissions for a given path.
- * Using this, one can set all possible combinations of permissions for the
- * owner of the file. But permissions for the group and all others can only
- * be set together, i.e. permissions for group cannot be set different from
- * those for others and vice versa.
- */
- static class PermissionsInfo {
- public boolean readPermissions;
- public boolean writePermissions;
- public boolean executablePermissions;
- public boolean readPermsOwnerOnly;
- public boolean writePermsOwnerOnly;
- public boolean executePermsOwnerOnly;
-
- /**
- * Create a permissions-info object with the given attributes
- *
- * @param readPerms
- * @param writePerms
- * @param executePerms
- * @param readOwnerOnly
- * @param writeOwnerOnly
- * @param executeOwnerOnly
- */
- public PermissionsInfo(boolean readPerms, boolean writePerms,
- boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
- boolean executeOwnerOnly) {
- readPermissions = readPerms;
- writePermissions = writePerms;
- executablePermissions = executePerms;
- readPermsOwnerOnly = readOwnerOnly;
- writePermsOwnerOnly = writeOwnerOnly;
- executePermsOwnerOnly = executeOwnerOnly;
- }
- }
-
- /**
- * Set permission on the given file path using the specified permissions
- * information. We use java api to set permission instead of spawning chmod
- * processes. This saves a lot of time. Using this, one can set all possible
- * combinations of permissions for the owner of the file. But permissions
- * for the group and all others can only be set together, i.e. permissions
- * for group cannot be set different from those for others and vice versa.
- *
- * This method should satisfy the needs of most of the applications. For
- * those it doesn't, {@link FileUtil#chmod} can be used.
- *
- * @param f file path
- * @param pInfo permissions information
- * @return true if success, false otherwise
- */
- static boolean setPermissions(File f, PermissionsInfo pInfo) {
- if (pInfo == null) {
- LOG.debug(" PermissionsInfo is null, returning.");
- return true;
- }
-
- LOG.debug("Setting permission for " + f.getAbsolutePath());
-
- boolean ret = true;
-
- // Clear all the flags
- ret = f.setReadable(false, false) && ret;
- ret = f.setWritable(false, false) && ret;
- ret = f.setExecutable(false, false) && ret;
-
- ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
- LOG.debug("Readable status for " + f + " set to " + ret);
- ret =
- f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
- && ret;
- LOG.debug("Writable status for " + f + " set to " + ret);
- ret =
- f.setExecutable(pInfo.executablePermissions,
- pInfo.executePermsOwnerOnly)
- && ret;
-
- LOG.debug("Executable status for " + f + " set to " + ret);
- return ret;
- }
-
- /**
- * Permissions rwxr_xr_x
- */
- static PermissionsInfo sevenFiveFive =
- new PermissionsInfo(true, true, true, false, true, false);
- /**
- * Completely private permissions
- */
- static PermissionsInfo sevenZeroZero =
- new PermissionsInfo(true, true, true, true, true, true);
- }
-
- /**
- * Prepare the job directories for a given job. To be called by the job
- * localization code, only if the job is not already localized.
- *
- * <br>
- * Here, we set 700 permissions on the job directories created on all disks.
- * This we do so as to avoid any misuse by other users till the time
- * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
- * later time to set proper private permissions on the job directories. <br>
- *
- * @param jobId
- * @param fs
- * @param localDirs
- * @throws IOException
- */
- private static void initializeJobDirs(JobID jobId, FileSystem fs,
- String[] localDirs)
- throws IOException {
- boolean initJobDirStatus = false;
- String jobDirPath = getLocalJobDir(jobId.toString());
- for (String localDir : localDirs) {
- Path jobDir = new Path(localDir, jobDirPath);
- if (fs.exists(jobDir)) {
- // this will happen on a partial execution of localizeJob. Sometimes
- // copying job.xml to the local disk succeeds but copying job.jar might
- // throw out an exception. We should clean up and then try again.
- fs.delete(jobDir, true);
- }
-
- boolean jobDirStatus = fs.mkdirs(jobDir);
- if (!jobDirStatus) {
- LOG.warn("Not able to create job directory " + jobDir.toString());
- }
-
- initJobDirStatus = initJobDirStatus || jobDirStatus;
-
- // job-dir has to be private to the TT
- PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
- PermissionsHandler.sevenZeroZero);
- }
-
- if (!initJobDirStatus) {
- throw new IOException("Not able to initialize job directories "
- + "in any of the configured local directories for job "
- + jobId.toString());
- }
- }
-
/**
* Download the job configuration file from the FS.
*
@@ -1036,7 +950,7 @@
* @return the local file system path of the downloaded file.
* @throws IOException
*/
- private Path localizeJobConfFile(Path jobFile, JobID jobId)
+ private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
throws IOException {
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
@@ -1050,7 +964,7 @@
}
Path localJobFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+ lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()),
jobFileSize, fConf);
// Download job.xml
@@ -1067,7 +981,7 @@
* @param localJobConf
* @throws IOException
*/
- private void localizeJobJarFile(JobID jobId, FileSystem localFs,
+ private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs,
JobConf localJobConf)
throws IOException {
// copy Jar file to the local FS and unjar it.
@@ -1082,11 +996,11 @@
} catch (FileNotFoundException fe) {
jarFileSize = -1;
}
- // Here we check for and we check five times the size of jarFileSize
- // to accommodate for unjarring the jar file in userfiles directory
+ // Here we check for five times the size of jarFileSize to accommodate for
+ // unjarring the jar file in the jars directory
Path localJarFile =
- lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
- 5 * jarFileSize, fConf);
+ lDirAlloc.getLocalPathForWrite(
+ getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
// Download job.jar
systemFS.copyToLocalFile(jarFilePath, localJarFile);
@@ -1100,45 +1014,6 @@
}
}
- /**
- * Create taskDirs on all the disks. Otherwise, in some cases, like when
- * LinuxTaskController is in use, child might wish to balance load across
- * disks but cannot itself create attempt directory because of the fact that
- * job directory is writable only by the TT.
- *
- * @param jobId
- * @param attemptId
- * @param isCleanupAttempt
- * @param fs
- * @param localDirs
- * @throws IOException
- */
- private static void initializeAttemptDirs(String jobId, String attemptId,
- boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
- throws IOException {
-
- boolean initStatus = false;
- String attemptDirPath =
- getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
-
- for (String localDir : localDirs) {
- Path localAttemptDir = new Path(localDir, attemptDirPath);
-
- boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
- if (!attemptDirStatus) {
- LOG.warn("localAttemptDir " + localAttemptDir.toString()
- + " couldn't be created.");
- }
- initStatus = initStatus || attemptDirStatus;
- }
-
- if (!initStatus) {
- throw new IOException("Not able to initialize attempt directories "
- + "in any of the configured local directories for the attempt "
- + attemptId.toString());
- }
- }
-
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
synchronized (tip) {
tip.setJobConf(jobConf);
@@ -1205,7 +1080,7 @@
}
this.running = false;
-
+
// Clear local storage
cleanupStorage();
@@ -1288,7 +1163,7 @@
protected TaskTracker(JobConf conf, boolean start) throws IOException {
super(conf);
fConf = conf;
- //for backwards compatibility, the task tracker starts up unless told not
+ //for backwards compatibility, the task tracker starts up unless told not
//to. Subclasses should be very cautious about having their superclass
//do that as subclassed methods can be invoked before the class is fully
//configured
@@ -1308,22 +1183,22 @@
throws IOException, InterruptedException {
JobConf conf = fConf;
fConf = conf;
- maxMapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum", 2);
- maxReduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum", 2);
+ maxMapSlots = conf.getInt(TT_MAP_SLOTS, 2);
+ maxReduceSlots = conf.getInt(TT_REDUCE_SLOTS, 2);
this.jobTrackAddr = JobTracker.getAddress(conf);
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
- conf.get("mapred.task.tracker.http.address", "0.0.0.0:50060"));
+ conf.get(TT_HTTP_ADDRESS, "0.0.0.0:50060"));
String httpBindAddress = infoSocAddr.getHostName();
int httpPort = infoSocAddr.getPort();
this.server = new HttpServer("task", httpBindAddress, httpPort,
httpPort == 0, conf);
- workerThreads = conf.getInt("tasktracker.http.threads", 40);
+ workerThreads = conf.getInt(TT_HTTP_THREADS, 40);
this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
server.setThreads(1, workerThreads);
// let the jsp pages get to the task tracker, config, and other relevant
// objects
FileSystem local = FileSystem.getLocal(conf);
- this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ this.localDirAllocator = new LocalDirAllocator(MRConfig.LOCAL_DIR);
server.setAttribute("task.tracker", this);
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
@@ -1398,7 +1273,7 @@
List <TaskCompletionEvent> recentMapEvents =
new ArrayList<TaskCompletionEvent>();
for (int i = 0; i < t.length; i++) {
- if (t[i].isMap) {
+ if (t[i].isMapTask()) {
recentMapEvents.add(t[i]);
}
}
@@ -1419,8 +1294,14 @@
long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
- // sleeps for the wait time
- Thread.sleep(waitTime);
+ // sleeps for the wait time or
+ // until there are empty slots to schedule tasks
+ synchronized (finishedCount) {
+ if (finishedCount.get() == 0) {
+ finishedCount.wait(waitTime);
+ }
+ finishedCount.set(0);
+ }
}
// If the TaskTracker is just starting up:
@@ -1461,39 +1342,6 @@
// next heartbeat
lastHeartbeat = System.currentTimeMillis();
-
- // Check if the map-event list needs purging
- Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
- if (jobs.size() > 0) {
- synchronized (this) {
- // purge the local map events list
- for (JobID job : jobs) {
- RunningJob rjob;
- synchronized (runningJobs) {
- rjob = runningJobs.get(job);
- if (rjob != null) {
- synchronized (rjob) {
- FetchStatus f = rjob.getFetchStatus();
- if (f != null) {
- f.reset();
- }
- }
- }
- }
- }
-
- // Mark the reducers in shuffle for rollback
- synchronized (shouldReset) {
- for (Map.Entry<TaskAttemptID, TaskInProgress> entry
- : runningTasks.entrySet()) {
- if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
- this.shouldReset.add(entry.getKey());
- }
- }
- }
- }
- }
-
TaskTrackerAction[] actions = heartbeatResponse.getActions();
if(LOG.isDebugEnabled()) {
LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
@@ -1798,9 +1646,8 @@
}
// Delete the job directory for this
// task if the job is done/failed
- if (!rjob.keepJobFiles){
- directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
- getLocalJobDir(rjob.getJobID().toString())));
+ if (!rjob.keepJobFiles) {
+ removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
}
// Remove this job
rjob.tasks.clear();
@@ -1810,10 +1657,20 @@
synchronized(runningJobs) {
runningJobs.remove(jobId);
}
-
- }
-
-
+ }
+
+ /**
+ * This job's files are no longer needed on this TT, remove them.
+ *
+ * @param rjob
+ * @throws IOException
+ */
+ void removeJobFiles(String user, String jobId)
+ throws IOException {
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
+ getLocalJobDir(user, jobId)));
+ }
+
/**
* Remove the tip and update all relevant state.
*
@@ -2148,6 +2005,19 @@
}
}
+ /**
+ * Notify the tasktracker to send an out-of-band heartbeat.
+ */
+ private void notifyTTAboutTaskCompletion() {
+ if (oobHeartbeatOnTaskCompletion) {
+ synchronized (finishedCount) {
+ int value = finishedCount.get();
+ finishedCount.set(value+1);
+ finishedCount.notify();
+ }
+ }
+ }
+
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -2254,26 +2124,25 @@
FileSystem localFs = FileSystem.getLocal(fConf);
// create taskDirs on all the disks.
- initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
- .toString(), task.isTaskCleanupTask(), localFs, fConf
- .getStrings("mapred.local.dir"));
+ getLocalizer().initializeAttemptDirs(task.getUser(),
+ task.getJobID().toString(), task.getTaskID().toString(),
+ task.isTaskCleanupTask());
// create the working-directory of the task
Path cwd =
- lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
- .toString(), task.getTaskID().toString(), task
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
+ .getJobID().toString(), task.getTaskID().toString(), task
.isTaskCleanupTask()), defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
- localJobConf.set("mapred.local.dir",
- fConf.get("mapred.local.dir"));
+ localJobConf.set(LOCAL_DIR,
+ fConf.get(LOCAL_DIR));
- if (fConf.get("slave.host.name") != null) {
- localJobConf.set("slave.host.name",
- fConf.get("slave.host.name"));
+ if (fConf.get(TT_HOST_NAME) != null) {
+ localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME));
}
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
@@ -2292,7 +2161,7 @@
str.append(',');
}
}
- localJobConf.set("hadoop.net.static.resolutions", str.toString());
+ localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString());
}
if (task.isMapTask()) {
debugCommand = localJobConf.getMapDebugScript();
@@ -2320,14 +2189,18 @@
return task;
}
- public TaskRunner getTaskRunner() {
+ TaskRunner getTaskRunner() {
return runner;
}
+ void setTaskRunner(TaskRunner rnr) {
+ this.runner = rnr;
+ }
+
public synchronized void setJobConf(JobConf lconf){
this.localJobConf = lconf;
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
- taskTimeout = localJobConf.getLong("mapred.task.timeout",
+ taskTimeout = localJobConf.getLong(JobContext.TASK_TIMEOUT,
10 * 60 * 1000);
}
@@ -2357,7 +2230,7 @@
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
- this.runner = task.createRunner(TaskTracker.this, this);
+ setTaskRunner(task.createRunner(TaskTracker.this, this));
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
@@ -2465,9 +2338,21 @@
return wasKilled;
}
- void reportTaskFinished() {
- taskFinished();
- releaseSlot();
+ /**
+ * A task is reporting in as 'done'.
+ *
+ * We need to notify the tasktracker to send an out-of-band heartbeat.
+ * If isn't <code>commitPending</code>, we need to finalize the task
+ * and release the slot it's occupied.
+ *
+ * @param commitPending is the task-commit pending?
+ */
+ void reportTaskFinished(boolean commitPending) {
+ if (!commitPending) {
+ taskFinished();
+ releaseSlot();
+ }
+ notifyTTAboutTaskCompletion();
}
/* State changes:
@@ -2535,84 +2420,7 @@
setTaskFailState(true);
// call the script here for the failed tasks.
if (debugCommand != null) {
- String taskStdout ="";
- String taskStderr ="";
- String taskSyslog ="";
- String jobConf = task.getJobFile();
- try {
- // get task's stdout file
- taskStdout = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDOUT));
- // get task's stderr file
- taskStderr = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDERR));
- // get task's syslog file
- taskSyslog = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.SYSLOG));
- } catch(IOException e){
- LOG.warn("Exception finding task's stdout/err/syslog files");
- }
- File workDir = null;
- try {
- workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(
- task.getJobID().toString(),
- task.getTaskID().toString(),
- task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- localJobConf). toString());
- } catch (IOException e) {
- LOG.warn("Working Directory of the task " + task.getTaskID() +
- " doesnt exist. Caught exception " +
- StringUtils.stringifyException(e));
- }
- // Build the command
- File stdout = TaskLog.getRealTaskLogFileLocation(
- task.getTaskID(), TaskLog.LogName.DEBUGOUT);
- // add pipes program as argument if it exists.
- String program ="";
- String executable = Submitter.getExecutable(localJobConf);
- if ( executable != null) {
- try {
- program = new URI(executable).getFragment();
- } catch (URISyntaxException ur) {
- LOG.warn("Problem in the URI fragment for pipes executable");
- }
- }
- String [] debug = debugCommand.split(" ");
- Vector<String> vargs = new Vector<String>();
- for (String component : debug) {
- vargs.add(component);
- }
- vargs.add(taskStdout);
- vargs.add(taskStderr);
- vargs.add(taskSyslog);
- vargs.add(jobConf);
- vargs.add(program);
- try {
- List<String> wrappedCommand = TaskLog.captureDebugOut
- (vargs, stdout);
- // run the script.
- try {
- runScript(wrappedCommand, workDir);
- } catch (IOException ioe) {
- LOG.warn("runScript failed with: " + StringUtils.
- stringifyException(ioe));
- }
- } catch(IOException e) {
- LOG.warn("Error in preparing wrapped debug command");
- }
-
- // add all lines of debug out to diagnostics
- try {
- int num = localJobConf.getInt("mapred.debug.out.lines", -1);
- addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
- } catch(IOException ioe) {
- LOG.warn("Exception in add diagnostics!");
- }
+ runDebugScript();
}
}
taskStatus.setProgress(0.0f);
@@ -2640,21 +2448,84 @@
}
-
- /**
- * Runs the script given in args
- * @param args script name followed by its argumnets
- * @param dir current working directory.
- * @throws IOException
- */
- public void runScript(List<String> args, File dir) throws IOException {
- ShellCommandExecutor shexec =
- new ShellCommandExecutor(args.toArray(new String[0]), dir);
- shexec.execute();
- int exitCode = shexec.getExitCode();
- if (exitCode != 0) {
- throw new IOException("Task debug script exit with nonzero status of "
- + exitCode + ".");
+ private void runDebugScript() {
+ String taskStdout ="";
+ String taskStderr ="";
+ String taskSyslog ="";
+ String jobConf = task.getJobFile();
+ try {
+ // get task's stdout file
+ taskStdout = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
+ (task.getTaskID(), TaskLog.LogName.STDOUT));
+ // get task's stderr file
+ taskStderr = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
+ (task.getTaskID(), TaskLog.LogName.STDERR));
+ // get task's syslog file
+ taskSyslog = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
+ (task.getTaskID(), TaskLog.LogName.SYSLOG));
+ } catch(IOException e){
+ LOG.warn("Exception finding task's stdout/err/syslog files");
+ }
+ File workDir = null;
+ try {
+ workDir =
+ new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getLocalTaskDir(task.getUser(), task
+ .getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask())
+ + Path.SEPARATOR + MRConstants.WORKDIR,
+ localJobConf).toString());
+ } catch (IOException e) {
+ LOG.warn("Working Directory of the task " + task.getTaskID() +
+ " doesnt exist. Caught exception " +
+ StringUtils.stringifyException(e));
+ }
+ // Build the command
+ File stdout = TaskLog.getRealTaskLogFileLocation(
+ task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+ // add pipes program as argument if it exists.
+ String program ="";
+ String executable = Submitter.getExecutable(localJobConf);
+ if ( executable != null) {
+ try {
+ program = new URI(executable).getFragment();
+ } catch (URISyntaxException ur) {
+ LOG.warn("Problem in the URI fragment for pipes executable");
+ }
+ }
+ String [] debug = debugCommand.split(" ");
+ List<String> vargs = new ArrayList<String>();
+ for (String component : debug) {
+ vargs.add(component);
+ }
+ vargs.add(taskStdout);
+ vargs.add(taskStderr);
+ vargs.add(taskSyslog);
+ vargs.add(jobConf);
+ vargs.add(program);
+ DebugScriptContext context =
+ new TaskController.DebugScriptContext();
+ context.args = vargs;
+ context.stdout = stdout;
+ context.workDir = workDir;
+ context.task = task;
+ try {
+ getTaskController().runDebugScript(context);
+ // add all lines of debug out to diagnostics
+ try {
+ int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES,
+ -1);
+ addDiagnostics(FileUtil.makeShellPath(stdout),num,
+ "DEBUG OUT");
+ } catch(IOException ioe) {
+ LOG.warn("Exception in add diagnostics!");
+ }
+ } catch (IOException ie) {
+ LOG.warn("runDebugScript failed with: " + StringUtils.
+ stringifyException(ie));
}
}
@@ -2767,8 +2638,10 @@
taskStatus.setRunState(TaskStatus.State.KILLED);
}
}
+ taskStatus.setFinishTime(System.currentTimeMillis());
removeFromMemoryManager(task.getTaskID());
releaseSlot();
+ notifyTTAboutTaskCompletion();
}
private synchronized void releaseSlot() {
@@ -2835,50 +2708,60 @@
}
}
synchronized (this) {
+ // localJobConf could be null if localization has not happened
+ // then no cleanup will be required.
+ if (localJobConf == null) {
+ return;
+ }
try {
- // localJobConf could be null if localization has not happened
- // then no cleanup will be required.
- if (localJobConf == null) {
- return;
- }
- String localTaskDir =
- getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
- task.isTaskCleanupTask());
- String taskWorkDir =
- getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
- task.isTaskCleanupTask());
- if (needCleanup) {
- if (runner != null) {
- //cleans up the output directory of the task (where map outputs
- //and reduce inputs get stored)
- runner.close();
- }
-
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- // No jvm reuse, remove everything
- directoryCleanupThread.addToQueue(localFs,
- getLocalFiles(defaultJobConf,
- localTaskDir));
- }
- else {
- // Jvm reuse. We don't delete the workdir since some other task
- // (running in the same JVM) might be using the dir. The JVM
- // running the tasks would clean the workdir per a task in the
- // task process itself.
- directoryCleanupThread.addToQueue(localFs, getLocalFiles(
- defaultJobConf, localTaskDir + Path.SEPARATOR
- + TaskTracker.JOBFILE));
- }
- } else {
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(localFs,
- getLocalFiles(defaultJobConf,
- taskWorkDir));
- }
- }
+ removeTaskFiles(needCleanup, taskId);
} catch (Throwable ie) {
- LOG.info("Error cleaning up task runner: " +
- StringUtils.stringifyException(ie));
+ LOG.info("Error cleaning up task runner: "
+ + StringUtils.stringifyException(ie));
+ }
+ }
+ }
+
+ /**
+ * Some or all of the files from this task are no longer required. Remove
+ * them via CleanupQueue.
+ *
+ * @param needCleanup
+ * @param taskId
+ * @throws IOException
+ */
+ void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
+ throws IOException {
+ if (needCleanup) {
+ if (runner != null) {
+ // cleans up the output directory of the task (where map outputs
+ // and reduce inputs get stored)
+ runner.close();
+ }
+
+ String localTaskDir =
+ getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+ .toString(), task.isTaskCleanupTask());
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ // No jvm reuse, remove everything
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir));
+ } else {
+ // Jvm reuse. We don't delete the workdir since some other task
+ // (running in the same JVM) might be using the dir. The JVM
+ // running the tasks would clean the workdir per a task in the
+ // task process itself.
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir + Path.SEPARATOR
+ + TaskTracker.JOBFILE));
+ }
+ } else {
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ String taskWorkDir =
+ getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask());
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, taskWorkDir));
}
}
}
@@ -3040,6 +2923,17 @@
purgeTask(tip, true);
}
+ /**
+ * A child task had a fatal error. Kill the task.
+ */
+ public synchronized void fatalError(TaskAttemptID taskId, String msg)
+ throws IOException {
+ LOG.fatal("Task: " + taskId + " - exited : " + msg);
+ TaskInProgress tip = runningTasks.get(taskId);
+ tip.reportDiagnosticInfo("Error: " + msg);
+ purgeTask(tip, true);
+ }
+
public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id)
throws IOException {
@@ -3076,9 +2970,7 @@
tip = tasks.get(taskid);
}
if (tip != null) {
- if (!commitPending) {
- tip.reportTaskFinished();
- }
+ tip.reportTaskFinished(commitPending);
} else {
LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
}
@@ -3109,6 +3001,7 @@
boolean localized;
boolean keepJobFiles;
FetchStatus f;
+ JobTokens jobTokens;
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -3262,7 +3155,7 @@
JobConf conf=new JobConf();
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
- (conf.getBoolean("tasktracker.contention.tracking", false));
+ (conf.getBoolean(TT_CONTENTION_TRACKING, false));
TaskTracker tracker = new TaskTracker(conf, false);
Service.startService(tracker);
tracker.run();
@@ -3284,147 +3177,248 @@
public void doGet(HttpServletRequest request,
HttpServletResponse response
) throws ServletException, IOException {
- TaskAttemptID reduceAttId = null;
- String mapId = request.getParameter("map");
+ long start = System.currentTimeMillis();
+ String mapIds = request.getParameter("map");
String reduceId = request.getParameter("reduce");
String jobId = request.getParameter("job");
+ LOG.debug("Shuffle started for maps (mapIds=" + mapIds + ") to reduce " +
+ reduceId);
+
if (jobId == null) {
throw new IOException("job parameter is required");
}
- if (mapId == null || reduceId == null) {
+ if (mapIds == null || reduceId == null) {
throw new IOException("map and reduce parameters are required");
}
- try {
- reduceAttId = TaskAttemptID.forName(reduceId);
- } catch (IllegalArgumentException e) {
- throw new IOException("reduce attempt ID is malformed");
- }
+
ServletContext context = getServletContext();
- int reduce = reduceAttId.getTaskID().getId();
- byte[] buffer = new byte[MAX_BYTES_TO_READ];
- // true iff IOException was caused by attempt to access input
- boolean isInputException = true;
- OutputStream outStream = null;
- FSDataInputStream mapOutputIn = null;
+ int reduce = Integer.parseInt(reduceId);
+ DataOutputStream outStream = null;
- long totalRead = 0;
ShuffleServerMetrics shuffleMetrics =
(ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");
- long startTime = 0;
+ verifyRequest(request, response, tracker, jobId);
+
+ int numMaps = 0;
try {
shuffleMetrics.serverHandlerBusy();
- if(ClientTraceLog.isInfoEnabled())
- startTime = System.nanoTime();
- outStream = response.getOutputStream();
+ outStream = new DataOutputStream(response.getOutputStream());
+ //use the same buffersize as used for reading the data from disk
+ response.setBufferSize(MAX_BYTES_TO_READ);
JobConf conf = (JobConf) context.getAttribute("conf");
LocalDirAllocator lDirAlloc =
(LocalDirAllocator)context.getAttribute("localDirAllocator");
FileSystem rfs = ((LocalFileSystem)
context.getAttribute("local.file.system")).getRaw();
- // Index file
- Path indexFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getIntermediateOutputDir(jobId, mapId)
- + "/file.out.index", conf);
-
- // Map-output file
- Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getIntermediateOutputDir(jobId, mapId)
- + "/file.out", conf);
+ // Split the map ids, send output for one map at a time
+ StringTokenizer itr = new StringTokenizer(mapIds, ",");
+ while(itr.hasMoreTokens()) {
+ String mapId = itr.nextToken();
+ ++numMaps;
+ sendMapFile(jobId, mapId, reduce, conf, outStream,
+ tracker, lDirAlloc, shuffleMetrics, rfs);
+ }
+ } catch (IOException ie) {
+ Log log = (Log) context.getAttribute("log");
+ String errorMsg = ("getMapOutputs(" + mapIds + "," + reduceId +
+ ") failed");
+ log.warn(errorMsg, ie);
+ response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+ shuffleMetrics.failedOutput();
+ throw ie;
+ } finally {
+ shuffleMetrics.serverHandlerFree();
+ }
+ outStream.close();
+ shuffleMetrics.successOutput();
+ long timeElapsed = (System.currentTimeMillis()-start);
+ LOG.info("Shuffled " + numMaps
+ + "maps (mapIds=" + mapIds + ") to reduce "
+ + reduceId + " in " + timeElapsed + "s");
+
+ if (ClientTraceLog.isInfoEnabled()) {
+ ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
+ request.getLocalAddr() + ":" + request.getLocalPort(),
+ request.getRemoteAddr() + ":" + request.getRemotePort(),
+ numMaps, "MAPRED_SHUFFLE", reduceId,
+ timeElapsed));
+ }
+ }
+ private void sendMapFile(String jobId, String mapId,
+ int reduce,
+ Configuration conf,
+ DataOutputStream outStream,
+ TaskTracker tracker,
+ LocalDirAllocator lDirAlloc,
+ ShuffleServerMetrics shuffleMetrics,
+ FileSystem localfs
+ ) throws IOException {
+
+ LOG.debug("sendMapFile called for " + mapId + " to reduce " + reduce);
+
+ // true iff IOException was caused by attempt to access input
+ boolean isInputException = false;
+ FSDataInputStream mapOutputIn = null;
+ byte[] buffer = new byte[MAX_BYTES_TO_READ];
+ long totalRead = 0;
+
+ String userName = null;
+ synchronized (tracker.runningJobs) {
+ RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+ if (rjob == null) {
+ throw new IOException("Unknown job " + jobId + "!!");
+ }
+ userName = rjob.jobConf.getUser();
+ }
+ // Index file
+ Path indexFileName =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ userName, jobId, mapId)
+ + "/file.out.index", conf);
+
+ // Map-output file
+ Path mapOutputFileName =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ userName, jobId, mapId)
+ + "/file.out", conf);
+
+ /**
+ * Read the index file to get the information about where the map-output
+ * for the given reducer is available.
+ */
+ IndexRecord info =
+ tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName);
+
+ try {
/**
- * Read the index file to get the information about where
- * the map-output for the given reducer is available.
- */
- IndexRecord info =
- tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
-
- //set the custom "from-map-task" http header to the map task from which
- //the map output data is being transferred
- response.setHeader(FROM_MAP_TASK, mapId);
-
- //set the custom "Raw-Map-Output-Length" http header to
- //the raw (decompressed) length
- response.setHeader(RAW_MAP_OUTPUT_LENGTH,
- Long.toString(info.rawLength));
-
- //set the custom "Map-Output-Length" http header to
- //the actual number of bytes being transferred
- response.setHeader(MAP_OUTPUT_LENGTH,
- Long.toString(info.partLength));
-
- //set the custom "for-reduce-task" http header to the reduce task number
- //for which this map output is being transferred
- response.setHeader(FOR_REDUCE_TASK, Integer.toString(reduce));
-
- //use the same buffersize as used for reading the data from disk
- response.setBufferSize(MAX_BYTES_TO_READ);
-
- /**
- * Read the data from the sigle map-output file and
+ * Read the data from the single map-output file and
* send it to the reducer.
*/
//open the map-output file
- mapOutputIn = rfs.open(mapOutputFileName);
-
+ mapOutputIn = localfs.open(mapOutputFileName);
//seek to the correct offset for the reduce
mapOutputIn.seek(info.startOffset);
+
+ // write header for each map output
+ ShuffleHeader header = new ShuffleHeader(mapId, info.partLength,
+ info.rawLength, reduce);
+ header.write(outStream);
+
+ // read the map-output and stream it out
+ isInputException = true;
long rem = info.partLength;
+ if (rem == 0) {
+ throw new IOException("Illegal partLength of 0 for mapId " + mapId +
+ " to reduce " + reduce);
+ }
int len =
mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
- while (rem > 0 && len >= 0) {
+ long now = 0;
+ while (len >= 0) {
rem -= len;
try {
shuffleMetrics.outputBytes(len);
- outStream.write(buffer, 0, len);
- outStream.flush();
+
+ if (len > 0) {
+ outStream.write(buffer, 0, len);
+ } else {
+ LOG.info("Skipped zero-length read of map " + mapId +
+ " to reduce " + reduce);
+ }
+
} catch (IOException ie) {
isInputException = false;
throw ie;
}
totalRead += len;
+ if (rem == 0) {
+ break;
+ }
len =
mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
}
-
- LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce +
- " from map: " + mapId + " given " + info.partLength + "/" +
- info.rawLength);
+
+ mapOutputIn.close();
} catch (IOException ie) {
- Log log = (Log) context.getAttribute("log");
- String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
- ") failed :\n"+
- StringUtils.stringifyException(ie));
- log.warn(errorMsg);
+ String errorMsg = "error on sending map " + mapId + " to reduce " +
+ reduce;
if (isInputException) {
- tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
+ tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg +
+ StringUtils.stringifyException(ie));
}
- response.sendError(HttpServletResponse.SC_GONE, errorMsg);
- shuffleMetrics.failedOutput();
- throw ie;
- } finally {
- if (null != mapOutputIn) {
- mapOutputIn.close();
+ if (mapOutputIn != null) {
+ try {
+ mapOutputIn.close();
+ } catch (IOException ioe) {
+ LOG.info("problem closing map output file", ioe);
+ }
}
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- shuffleMetrics.serverHandlerFree();
- if (ClientTraceLog.isInfoEnabled()) {
- ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
- request.getLocalAddr() + ":" + request.getLocalPort(),
- request.getRemoteAddr() + ":" + request.getRemotePort(),
- totalRead, "MAPRED_SHUFFLE", mapId, reduceId,
- endTime-startTime));
+ throw new IOException(errorMsg, ie);
+ }
+
+ LOG.info("Sent out " + totalRead + " bytes to reduce " + reduce +
+ " from map: " + mapId + " given " + info.partLength + "/" +
+ info.rawLength);
+ }
+
+ /**
+ * verify that request has correct HASH for the url
+ * and also add a field to reply header with hash of the HASH
+ * @param request
+ * @param response
+ * @param jt the job token
+ * @throws IOException
+ */
+ private void verifyRequest(HttpServletRequest request,
+ HttpServletResponse response, TaskTracker tracker, String jobId)
+ throws IOException {
+ JobTokens jt = null;
+ synchronized (tracker.runningJobs) {
+ RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+ if (rjob == null) {
+ throw new IOException("Unknown job " + jobId + "!!");
}
+ jt = rjob.jobTokens;
}
- outStream.close();
- shuffleMetrics.successOutput();
+ // string to encrypt
+ String enc_str = SecureShuffleUtils.buildMsgFrom(request);
+
+ // hash from the fetcher
+ String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ if(urlHashStr == null) {
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ throw new IOException("fetcher cannot be authenticated");
+ }
+ int len = urlHashStr.length();
+ LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
+ urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
+
+ SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
+ // verify - throws exception
+ try {
+ ssutil.verifyReply(urlHashStr, enc_str);
+ } catch (IOException ioe) {
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ throw ioe;
+ }
+
+ // verification passed - encode the reply
+ String reply = ssutil.generateHash(urlHashStr.getBytes());
+ response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+
+ len = reply.length();
+ LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply="
+ +reply.substring(len-len/2, len-1));
}
}
+
// get the full paths of the directory in all the local disks.
private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
@@ -3522,7 +3516,7 @@
}
Class<? extends MemoryCalculatorPlugin> clazz =
- fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+ fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN,
null, MemoryCalculatorPlugin.class);
MemoryCalculatorPlugin memoryCalculatorPlugin =
MemoryCalculatorPlugin
@@ -3546,11 +3540,11 @@
mapSlotMemorySizeOnTT =
fConf.getLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ MAPMEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT);
reduceSlotSizeMemoryOnTT =
fConf.getLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ REDUCEMEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT);
totalMemoryAllottedForTasks =
maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots
@@ -3656,66 +3650,99 @@
return distributedCacheManager;
}
- /**
- * Thread that handles cleanup
- */
- private class TaskCleanupThread extends Daemon {
-
/**
- * flag to halt work
+ * Download the job-token file from the FS and save on local fs.
+ * @param user
+ * @param jobId
+ * @param jobConf
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
*/
- private volatile boolean live = true;
+ private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+ throws IOException {
+ // check if the tokenJob file is there..
+ Path skPath = new Path(systemDirectory,
+ jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+ FileStatus status = null;
+ long jobTokenSize = -1;
+ status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+ jobTokenSize = status.getLen();
- /**
- * Construct a daemon thread.
- */
- private TaskCleanupThread() {
- setName("Task Tracker Task Cleanup Thread");
- }
+ Path localJobTokenFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
+ jobId.toString()), jobTokenSize, fConf);
- /**
- * End the daemon. This is done by setting the live flag to false and
- * interrupting ourselves.
- */
- public void terminate() {
- live = false;
- interrupt();
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
+ " to " + localJobTokenFile.toUri().getPath());
+
+ // Download job_token
+ systemFS.copyToLocalFile(skPath, localJobTokenFile);
+ // set it into jobConf to transfer the name to TaskRunner
+ jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
}
+
/**
- * process task kill actions until told to stop being live.
+ * Thread that handles cleanup
*/
- public void run() {
- LOG.debug("Task cleanup thread started");
- while (live) {
- try {
- TaskTrackerAction action = tasksToCleanup.take();
- if (action instanceof KillJobAction) {
- purgeJob((KillJobAction) action);
- } else if (action instanceof KillTaskAction) {
- TaskInProgress tip;
- KillTaskAction killAction = (KillTaskAction) action;
- synchronized (TaskTracker.this) {
- tip = tasks.get(killAction.getTaskID());
+ private class TaskCleanupThread extends Daemon {
+
+ /**
+ * flag to halt work
+ */
+ private volatile boolean live = true;
+
+
+ /**
+ * Construct a daemon thread.
+ */
+ private TaskCleanupThread() {
+ setName("Task Tracker Task Cleanup Thread");
+ }
+
+ /**
+ * End the daemon. This is done by setting the live flag to false and
+ * interrupting ourselves.
+ */
+ public void terminate() {
+ live = false;
+ interrupt();
+ }
+
+ /**
+ * process task kill actions until told to stop being live.
+ */
+ public void run() {
+ LOG.debug("Task cleanup thread started");
+ while (live) {
+ try {
+ TaskTrackerAction action = tasksToCleanup.take();
+ if (action instanceof KillJobAction) {
+ purgeJob((KillJobAction) action);
+ } else if (action instanceof KillTaskAction) {
+ TaskInProgress tip;
+ KillTaskAction killAction = (KillTaskAction) action;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " +
+ killAction.getTaskID());
+ purgeTask(tip, false);
+ } else {
+ LOG.error("Non-delete action given to cleanup thread: "
+ + action);
+ }
+ } catch (InterruptedException except) {
+ //interrupted. this may have reset the live flag
+ } catch (Throwable except) {
+ LOG.warn("Exception in Cleanup thread: " + except,
+ except);
+ }
}
- LOG.info("Received KillTaskAction for task: " +
- killAction.getTaskID());
- purgeTask(tip, false);
- } else {
- LOG.error("Non-delete action given to cleanup thread: "
- + action);
- }
- } catch (InterruptedException except) {
- //interrupted. this may have reset the live flag
- } catch (Throwable except) {
- LOG.warn("Exception in Cleanup thread: " + except,
- except);
+ LOG.debug("Task cleanup thread ending");
}
- }
- LOG.debug("Task cleanup thread ending");
- }
- }
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
* parent is a daemon which which polls the central master for a new map or
* reduce task and runs it as a child process. All communication between child
* and parent is via this protocol. */
-interface TaskUmbilicalProtocol extends VersionedProtocol {
+public interface TaskUmbilicalProtocol extends VersionedProtocol {
/**
* Changed the version to 2, since we have a new method getMapOutputs
@@ -56,9 +56,10 @@
* Version 16 Change in signature of getTask() for HADOOP-5488
* Version 17 Modified TaskID to be aware of the new TaskTypes
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+ * Version 19 Added fatalError for child to communicate fatal errors to TT
* */
- public static final long versionID = 18L;
+ public static final long versionID = 19L;
/**
* Called when a child task process starts, to get its task.
@@ -132,13 +133,15 @@
/** Report that the task encounted a local filesystem error.*/
void fsError(TaskAttemptID taskId, String message) throws IOException;
+ /** Report that the task encounted a fatal error.*/
+ void fatalError(TaskAttemptID taskId, String message) throws IOException;
+
/** Called by a reduce task to get the map output locations for finished maps.
* Returns an update centered around the map-task-completion-events.
* The update also piggybacks the information whether the events copy at the
* task-tracker has changed or not. This will trigger some action at the
* child-process.
*
- * @param taskId the reduce task id
* @param fromIndex the index starting from which the locations should be
* fetched
* @param maxLocs the max number of locations to fetch
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -42,7 +42,11 @@
}
protected boolean isSplitable(FileSystem fs, Path file) {
- return compressionCodecs.getCodec(file) == null;
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+ return codec instanceof SplittableCompressionCodec;
}
public RecordReader<LongWritable, Text> getRecordReader(
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -113,7 +113,7 @@
Progressable progress)
throws IOException {
boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator = job.get("mapred.textoutputformat.separator",
+ String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
"\t");
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/** * Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java Sat Nov 28 20:26:01 2009
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -30,19 +32,45 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* The Chain class provides all the common functionality for the
* {@link ChainMapper} and the {@link ChainReducer} classes.
- * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
*/
-@Deprecated
-class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
+class Chain {
+ private static final String CHAIN_MAPPER = "chain.mapper";
+ private static final String CHAIN_REDUCER = "chain.reducer";
+
+ private static final String CHAIN_MAPPER_SIZE = ".size";
+ private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+ private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+ private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+ private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
+ private static final String MAPPER_INPUT_KEY_CLASS =
+ "chain.mapper.input.key.class";
+ private static final String MAPPER_INPUT_VALUE_CLASS =
+ "chain.mapper.input.value.class";
+ private static final String MAPPER_OUTPUT_KEY_CLASS =
+ "chain.mapper.output.key.class";
+ private static final String MAPPER_OUTPUT_VALUE_CLASS =
+ "chain.mapper.output.value.class";
+ private static final String REDUCER_INPUT_KEY_CLASS =
+ "chain.reducer.input.key.class";
+ private static final String REDUCER_INPUT_VALUE_CLASS =
+ "chain.reducer.input.value.class";
+ private static final String REDUCER_OUTPUT_KEY_CLASS =
+ "chain.reducer.output.key.class";
+ private static final String REDUCER_OUTPUT_VALUE_CLASS =
+ "chain.reducer.output.value.class";
+
+ private boolean isMap;
+
private JobConf chainJobConf;
private List<Mapper> mappers = new ArrayList<Mapper>();
@@ -64,7 +92,51 @@
* Reducer.
*/
Chain(boolean isMap) {
- super(isMap);
+ this.isMap = isMap;
+ }
+
+ /**
+ * Returns the prefix to use for the configuration of the chain depending
+ * if it is for a Mapper or a Reducer.
+ *
+ * @param isMap TRUE for Mapper, FALSE for Reducer.
+ * @return the prefix to use.
+ */
+ private static String getPrefix(boolean isMap) {
+ return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+ }
+
+ /**
+ * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
+ * <p/>
+ * It creates a new JobConf using the chain job's JobConf as base and adds to
+ * it the configuration properties for the chain element. The keys of the
+ * chain element jobConf have precedence over the given JobConf.
+ *
+ * @param jobConf the chain job's JobConf.
+ * @param confKey the key for chain element configuration serialized in the
+ * chain job's JobConf.
+ * @return a new JobConf aggregating the chain job's JobConf with the chain
+ * element configuration properties.
+ */
+ private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
+ JobConf conf;
+ try {
+ Stringifier<JobConf> stringifier =
+ new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+ conf = stringifier.fromString(jobConf.get(confKey, null));
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex);
+ }
+ // we have to do this because the Writable desearialization clears all
+ // values set in the conf making not possible do do a new JobConf(jobConf)
+ // in the creation of the conf above
+ jobConf = new JobConf(jobConf);
+
+ for(Map.Entry<String, String> entry : conf) {
+ jobConf.set(entry.getKey(), entry.getValue());
+ }
+ return jobConf;
}
/**
@@ -97,27 +169,82 @@
String prefix = getPrefix(isMap);
// if a reducer chain check the Reducer has been already set
- checkReducerAlreadySet(isMap, jobConf, prefix, true);
-
- // set the mapper class
- int index = getIndex(jobConf, prefix);
+ if (!isMap) {
+ if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
+ Reducer.class) == null) {
+ throw new IllegalStateException(
+ "A Mapper can be added to the chain only after the Reducer has " +
+ "been set");
+ }
+ }
+ int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
-
- validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
- outputKeyClass, outputValueClass, index, prefix);
-
+
+ // if it is a reducer chain and the first Mapper is being added check the
+ // key and value input classes of the mapper match those of the reducer
+ // output.
+ if (!isMap && index == 0) {
+ JobConf reducerConf =
+ getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+ if (! inputKeyClass.isAssignableFrom(
+ reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
+ throw new IllegalArgumentException("The Reducer output key class does" +
+ " not match the Mapper input key class");
+ }
+ if (! inputValueClass.isAssignableFrom(
+ reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
+ throw new IllegalArgumentException("The Reducer output value class" +
+ " does not match the Mapper input value class");
+ }
+ } else if (index > 0) {
+ // check the that the new Mapper in the chain key and value input classes
+ // match those of the previous Mapper output.
+ JobConf previousMapperConf =
+ getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
+ (index - 1));
+ if (! inputKeyClass.isAssignableFrom(
+ previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
+ throw new IllegalArgumentException("The Mapper output key class does" +
+ " not match the previous Mapper input key class");
+ }
+ if (! inputValueClass.isAssignableFrom(
+ previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
+ throw new IllegalArgumentException("The Mapper output value class" +
+ " does not match the previous Mapper input value class");
+ }
+ }
+
// if the Mapper does not have a private JobConf create an empty one
if (mapperConf == null) {
- // using a JobConf without defaults to make it lightweight.
- // still the chain JobConf may have all defaults and this conf is
- // overlapped to the chain JobConf one.
+ // using a JobConf without defaults to make it lightweight.
+ // still the chain JobConf may have all defaults and this conf is
+ // overlapped to the chain JobConf one.
mapperConf = new JobConf(true);
}
- // store in the private mapper conf if it works by value or by reference
+
+ // store in the private mapper conf the input/output classes of the mapper
+ // and if it works by value or by reference
mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
-
- setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
- outputKeyClass, outputValueClass, mapperConf, index, prefix);
+ mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+ mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
+ Object.class);
+ mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+ mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+ Object.class);
+
+ // serialize the private mapper jobconf in the chain jobconf.
+ Stringifier<JobConf> stringifier =
+ new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+ try {
+ jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
+ stringifier.toString(new JobConf(mapperConf)));
+ }
+ catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+
+ // increment the chain counter
+ jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
}
/**
@@ -146,10 +273,13 @@
Class<? extends V2> outputValueClass,
boolean byValue, JobConf reducerConf) {
String prefix = getPrefix(false);
- checkReducerAlreadySet(false, jobConf, prefix, false);
+
+ if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+ throw new IllegalStateException("Reducer has been already set");
+ }
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
-
+
// if the Reducer does not have a private JobConf create an empty one
if (reducerConf == null) {
// using a JobConf without defaults to make it lightweight.
@@ -161,9 +291,24 @@
// store in the private reducer conf the input/output classes of the reducer
// and if it works by value or by reference
reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
-
- setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
- outputValueClass, reducerConf, prefix);
+ reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+ reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+ Object.class);
+ reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
+ Object.class);
+ reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+ Object.class);
+
+ // serialize the private mapper jobconf in the chain jobconf.
+ Stringifier<JobConf> stringifier =
+ new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+ try {
+ jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
+ stringifier.toString(new JobConf(reducerConf)));
+ }
+ catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
}
/**
@@ -180,8 +325,8 @@
for (int i = 0; i < index; i++) {
Class<? extends Mapper> klass =
jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
- JobConf mConf = new JobConf(
- getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
+ JobConf mConf =
+ getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
mappers.add(mapper);
@@ -198,8 +343,8 @@
Class<? extends Reducer> klass =
jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
if (klass != null) {
- JobConf rConf = new JobConf(
- getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
+ JobConf rConf =
+ getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
reducer = ReflectionUtils.newInstance(klass, rConf);
if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
reducerKeySerialization = serializationFactory
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Sat Nov 28 20:26:01 2009
@@ -86,10 +86,7 @@
* RunningJob job = jc.submitJob(conf);
* ...
* </pre>
- * @deprecated
- * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead
*/
-@Deprecated
public class ChainMapper implements Mapper {
/**