You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2011/10/06 00:37:53 UTC
svn commit: r1179465 [2/3] - in
/hadoop/common/branches/branch-0.20-security-205/src:
core/org/apache/hadoop/util/ mapred/ mapred/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/mapred/
test/org/apache/hadoop/util/
Added: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig?rev=1179465&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig (added)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig Wed Oct 5 22:37:52 2011
@@ -0,0 +1,4227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Pattern;
+
+import javax.crypto.SecretKey;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.*;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.security.Credentials;
+
+/*******************************************************
+ * TaskTracker is a process that starts and tracks MR Tasks
+ * in a networked environment. It contacts the JobTracker
+ * for Task assignments and reporting results.
+ *
+ *******************************************************/
+public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
+ Runnable, TaskTrackerMXBean {
+
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
+ "mapred.tasktracker.vmem.reserved";
+ /**
+ * @deprecated
+ */
+ @Deprecated
+ static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
+ "mapred.tasktracker.pmem.reserved";
+
+ static final String CONF_VERSION_KEY = "mapreduce.tasktracker.conf.version";
+ static final String CONF_VERSION_DEFAULT = "default";
+
+ static final long WAIT_FOR_DONE = 3 * 1000;
+ private int httpPort;
+
+ static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+
+ static{
+ Configuration.addDefaultResource("mapred-default.xml");
+ Configuration.addDefaultResource("mapred-site.xml");
+ }
+
+ public static final Log LOG =
+ LogFactory.getLog(TaskTracker.class);
+
+ public static final String MR_CLIENTTRACE_FORMAT =
+ "src: %s" + // src IP
+ ", dest: %s" + // dst IP
+ ", bytes: %s" + // byte count
+ ", op: %s" + // operation
+ ", cliID: %s" + // task id
+ ", duration: %s"; // duration
+ public static final Log ClientTraceLog =
+ LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
+
+ //Job ACLs file is created by TaskController under userlogs/$jobid directory
+ //for each job at job localization time. This will be used by TaskLogServlet
+ //for authorizing viewing of task logs of that job
+ static String jobACLsFile = "job-acls.xml";
+
+ volatile boolean running = true;
+
+ /**
+ * Manages TT local storage directories.
+ */
+ static class LocalStorage {
+ private List<String> localDirs;
+ private int numFailures;
+
+ public LocalStorage(String[] dirs) {
+ localDirs = new ArrayList<String>();
+ localDirs.addAll(Arrays.asList(dirs));
+ }
+
+ /**
+ * @return the current valid directories
+ */
+ synchronized String[] getDirs() {
+ return localDirs.toArray(new String[localDirs.size()]);
+ }
+
+ /**
+ * @return the current valid dirs as comma separated string
+ */
+ synchronized String getDirsString() {
+ return StringUtils.join(",", localDirs);
+ }
+
+ /**
+ * @return the number of valid local directories
+ */
+ synchronized int numDirs() {
+ return localDirs.size();
+ }
+
+ /**
+ * @return the number of directory failures
+ */
+ synchronized int numFailures() {
+ return numFailures;
+ }
+
+ /**
+ * Check the current set of local directories, updating the list
+ * of valid directories if necessary.
+ * @throws DiskErrorException if no directories are writable
+ */
+ synchronized void checkDirs() throws DiskErrorException {
+ for (String dir : localDirs) {
+ try {
+ DiskChecker.checkDir(new File(dir));
+ } catch (DiskErrorException de) {
+ LOG.warn("TaskTracker local dir " + dir + " error " +
+ de.getMessage() + ", removing from local dirs");
+ localDirs.remove(dir);
+ numFailures++;
+ }
+ }
+ if (localDirs.isEmpty()) {
+ throw new DiskErrorException(
+ "No mapred local directories are writable");
+ }
+ }
+ }
+
+ private LocalStorage localStorage;
+ private long lastCheckDirsTime;
+ private int lastNumFailures;
+ private LocalDirAllocator localDirAllocator;
+ String taskTrackerName;
+ String localHostname;
+ InetSocketAddress jobTrackAddr;
+
+ InetSocketAddress taskReportAddress;
+
+ Server taskReportServer = null;
+ InterTrackerProtocol jobClient;
+
+ private TrackerDistributedCacheManager distributedCacheManager;
+ static int FILE_CACHE_SIZE = 2000;
+
+ // last heartbeat response recieved
+ short heartbeatResponseId = -1;
+
+ static final String TASK_CLEANUP_SUFFIX = ".cleanup";
+
+ /*
+ * This is the last 'status' report sent by this tracker to the JobTracker.
+ *
+ * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
+ * indicating that a 'fresh' status report be generated; in the event the
+ * rpc calls fails for whatever reason, the previous status report is sent
+ * again.
+ */
+ TaskTrackerStatus status = null;
+
+ // The system-directory on HDFS where job files are stored
+ Path systemDirectory = null;
+
+ // The filesystem where job files are stored
+ FileSystem systemFS = null;
+ private FileSystem localFs = null;
+ private final HttpServer server;
+
+ volatile boolean shuttingDown = false;
+
+ Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
+ /**
+ * Map from taskId -> TaskInProgress.
+ */
+ Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+ Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+ private final JobTokenSecretManager jobTokenSecretManager
+ = new JobTokenSecretManager();
+
+ JobTokenSecretManager getJobTokenSecretManager() {
+ return jobTokenSecretManager;
+ }
+
+ RunningJob getRunningJob(JobID jobId) {
+ return runningJobs.get(jobId);
+ }
+
+ volatile int mapTotal = 0;
+ volatile int reduceTotal = 0;
+ boolean justStarted = true;
+ boolean justInited = true;
+ // Mark reduce tasks that are shuffling to rollback their events index
+ Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>();
+
+ //dir -> DF
+ Map<String, DF> localDirsDf = new HashMap<String, DF>();
+ long minSpaceStart = 0;
+ //must have this much space free to start new tasks
+ boolean acceptNewTasks = true;
+ long minSpaceKill = 0;
+ //if we run under this limit, kill one task
+ //and make sure we never receive any new jobs
+ //until all the old tasks have been cleaned up.
+ //this is if a machine is so full it's only good
+ //for serving map output to the other nodes
+
+ static Random r = new Random();
+ public static final String SUBDIR = "taskTracker";
+ static final String DISTCACHEDIR = "distcache";
+ static final String JOBCACHE = "jobcache";
+ static final String OUTPUT = "output";
+ static final String JARSDIR = "jars";
+ static final String LOCAL_SPLIT_FILE = "split.info";
+ static final String JOBFILE = "job.xml";
+ static final String TT_PRIVATE_DIR = "ttprivate";
+ public static final String TT_LOG_TMP_DIR = "tt_log_tmp";
+ static final String JVM_EXTRA_ENV_FILE = "jvm.extra.env";
+
+ static final String JOB_LOCAL_DIR = "job.local.dir";
+ static final String JOB_TOKEN_FILE="jobToken"; //localized file
+
+ private JobConf fConf;
+ private JobConf originalConf;
+ private Localizer localizer;
+ private int maxMapSlots;
+ private int maxReduceSlots;
+ private int failures;
+ final long mapRetainSize;
+ final long reduceRetainSize;
+
+ private ACLsManager aclsManager;
+
+ // Performance-related config knob to send an out-of-band heartbeat
+ // on task completion
+ static final String TT_OUTOFBAND_HEARBEAT =
+ "mapreduce.tasktracker.outofband.heartbeat";
+ private volatile boolean oobHeartbeatOnTaskCompletion;
+
+ // Track number of completed tasks to send an out-of-band heartbeat
+ private IntWritable finishedCount = new IntWritable(0);
+
+ private MapEventsFetcherThread mapEventsFetcher;
+ final int workerThreads;
+ CleanupQueue directoryCleanupThread;
+ private volatile JvmManager jvmManager;
+
+ private TaskMemoryManagerThread taskMemoryManager;
+ private boolean taskMemoryManagerEnabled = true;
+ private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+
+ private UserLogManager userLogManager;
+
+ static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
+ "mapred.tasktracker.memory_calculator_plugin";
+
+ /**
+ * the minimum interval between jobtracker polls
+ */
+ private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
+ /**
+ * Number of maptask completion events locations to poll for at one time
+ */
+ private int probe_sample_size = 500;
+
+ private IndexCache indexCache;
+
+ /**
+ * Handle to the specific instance of the {@link TaskController} class
+ */
+ private TaskController taskController;
+
+ /**
+ * Handle to the specific instance of the {@link NodeHealthCheckerService}
+ */
+ private NodeHealthCheckerService healthChecker;
+
+ /**
+ * Configuration property for disk health check interval in milli seconds.
+ * Currently, configuring this to a value smaller than the heartbeat interval
+ * is equivalent to setting this to heartbeat interval value.
+ */
+ static final String DISK_HEALTH_CHECK_INTERVAL_PROPERTY =
+ "mapred.disk.healthChecker.interval";
+ /**
+ * How often TaskTracker needs to check the health of its disks.
+ * Default value is {@link MRConstants#DEFAULT_DISK_HEALTH_CHECK_INTERVAL}
+ */
+ private long diskHealthCheckInterval;
+
+ /*
+ * A list of commitTaskActions for whom commit response has been received
+ */
+ private List<TaskAttemptID> commitResponses =
+ Collections.synchronizedList(new ArrayList<TaskAttemptID>());
+
+ private ShuffleServerInstrumentation shuffleServerMetrics;
+
+ private TaskTrackerInstrumentation myInstrumentation = null;
+
+ public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
+ return myInstrumentation;
+ }
+
+ /**
+ * A list of tips that should be cleaned up.
+ */
+ private BlockingQueue<TaskTrackerAction> tasksToCleanup =
+ new LinkedBlockingQueue<TaskTrackerAction>();
+
+ /**
+ * A daemon-thread that pulls tips off the list of things to cleanup.
+ */
+ private Thread taskCleanupThread =
+ new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ TaskTrackerAction action = tasksToCleanup.take();
+ checkJobStatusAndWait(action);
+ if (action instanceof KillJobAction) {
+ purgeJob((KillJobAction) action);
+ } else if (action instanceof KillTaskAction) {
+ processKillTaskAction((KillTaskAction) action);
+ } else {
+ LOG.error("Non-delete action given to cleanup thread: "
+ + action);
+ }
+ } catch (Throwable except) {
+ LOG.warn(StringUtils.stringifyException(except));
+ }
+ }
+ }
+ }, "taskCleanup");
+
+ void processKillTaskAction(KillTaskAction killAction) throws IOException {
+ TaskInProgress tip;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
+ purgeTask(tip, false);
+ }
+
+ private void checkJobStatusAndWait(TaskTrackerAction action)
+ throws InterruptedException {
+ JobID jobId = null;
+ if (action instanceof KillJobAction) {
+ jobId = ((KillJobAction)action).getJobID();
+ } else if (action instanceof KillTaskAction) {
+ jobId = ((KillTaskAction)action).getTaskID().getJobID();
+ } else {
+ return;
+ }
+ RunningJob rjob = null;
+ synchronized (runningJobs) {
+ rjob = runningJobs.get(jobId);
+ }
+ if (rjob != null) {
+ synchronized (rjob) {
+ while (rjob.localizing) {
+ rjob.wait();
+ }
+ }
+ }
+ }
+
+ public TaskController getTaskController() {
+ return taskController;
+ }
+
+ // Currently this is used only by tests
+ void setTaskController(TaskController t) {
+ taskController = t;
+ }
+
+ private RunningJob addTaskToJob(JobID jobId,
+ TaskInProgress tip) {
+ synchronized (runningJobs) {
+ RunningJob rJob = null;
+ if (!runningJobs.containsKey(jobId)) {
+ rJob = new RunningJob(jobId);
+ rJob.tasks = new HashSet<TaskInProgress>();
+ runningJobs.put(jobId, rJob);
+ } else {
+ rJob = runningJobs.get(jobId);
+ }
+ synchronized (rJob) {
+ rJob.tasks.add(tip);
+ }
+ return rJob;
+ }
+ }
+
+ private void removeTaskFromJob(JobID jobId, TaskInProgress tip) {
+ synchronized (runningJobs) {
+ RunningJob rjob = runningJobs.get(jobId);
+ if (rjob == null) {
+ LOG.warn("Unknown job " + jobId + " being deleted.");
+ } else {
+ synchronized (rjob) {
+ rjob.tasks.remove(tip);
+ }
+ }
+ }
+ }
+
+ UserLogManager getUserLogManager() {
+ return this.userLogManager;
+ }
+
+ void setUserLogManager(UserLogManager u) {
+ this.userLogManager = u;
+ }
+
+ public static String getUserDir(String user) {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + user;
+ }
+
+ Localizer getLocalizer() {
+ return localizer;
+ }
+
+ void setLocalizer(Localizer l) {
+ localizer = l;
+ }
+
+ public static String getPrivateDistributedCacheDir(String user) {
+ return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+ }
+
+ public static String getPublicDistributedCacheDir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+ }
+
+ public static String getJobCacheSubdir(String user) {
+ return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
+ }
+
+ public static String getLocalJobDir(String user, String jobid) {
+ return getJobCacheSubdir(user) + Path.SEPARATOR + jobid;
+ }
+
+ static String getLocalJobConfFile(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+ }
+
+ static String getPrivateDirJobConfFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+ }
+
+ static String getTaskConfFile(String user, String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+ + Path.SEPARATOR + TaskTracker.JOBFILE;
+ }
+
+ static String getPrivateDirTaskScriptLocation(String user, String jobid,
+ String taskid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalTaskDir(user, jobid, taskid);
+ }
+
+ static String getJobJarsDir(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+ }
+
+ public static String getJobJarFile(String user, String jobid) {
+ return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
+ }
+
+ static String getJobWorkDir(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
+ static String getLocalSplitFile(String user, String jobid, String taskid) {
+ return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.LOCAL_SPLIT_FILE;
+ }
+
+ static String getIntermediateOutputDir(String user, String jobid,
+ String taskid) {
+ return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.OUTPUT;
+ }
+
+ public static String getLocalTaskDir(String user, String jobid,
+ String taskid) {
+ return getLocalTaskDir(user, jobid, taskid, false);
+ }
+
+ public static String getLocalTaskDir(String user, String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ taskDir = taskDir + TASK_CLEANUP_SUFFIX;
+ }
+ return taskDir;
+ }
+
+ static String getTaskWorkDir(String user, String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
+ static String getLocalJobTokenFile(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+ }
+
+ static String getPrivateDirJobTokenFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalJobTokenFile(user, jobid);
+ }
+
+ static String getPrivateDirForJob(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+ }
+
+ private FileSystem getFS(final Path filePath, JobID jobId,
+ final Configuration conf) throws IOException, InterruptedException {
+ RunningJob rJob = runningJobs.get(jobId);
+ FileSystem userFs =
+ rJob.ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ return filePath.getFileSystem(conf);
+ }});
+ return userFs;
+ }
+
+ String getPid(TaskAttemptID tid) {
+ TaskInProgress tip = tasks.get(tid);
+ if (tip != null) {
+ return jvmManager.getPid(tip.getTaskRunner());
+ }
+ return null;
+ }
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
+ return TaskUmbilicalProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol for task tracker: " +
+ protocol);
+ }
+ }
+
+ /**
+ * Delete all of the user directories.
+ * @param conf the TT configuration
+ * @throws IOException
+ */
+ private void deleteUserDirectories(Configuration conf) throws IOException {
+ for(String root: localStorage.getDirs()) {
+ for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+ String owner = status.getOwner();
+ String path = status.getPath().getName();
+ if (path.equals(owner)) {
+ taskController.deleteAsUser(owner, "");
+ }
+ }
+ }
+ }
+
+ public static final String TT_USER_NAME = "mapreduce.tasktracker.kerberos.principal";
+ public static final String TT_KEYTAB_FILE =
+ "mapreduce.tasktracker.keytab.file";
+ /**
+ * Do the real constructor work here. It's in a separate method
+ * so we can call it again and "recycle" the object after calling
+ * close().
+ */
+ synchronized void initialize() throws IOException, InterruptedException {
+ this.fConf = new JobConf(originalConf);
+
+ LOG.info("Starting tasktracker with owner as "
+ + getMROwner().getShortUserName());
+
+ localFs = FileSystem.getLocal(fConf);
+ if (fConf.get("slave.host.name") != null) {
+ this.localHostname = fConf.get("slave.host.name");
+ }
+ if (localHostname == null) {
+ this.localHostname =
+ DNS.getDefaultHost
+ (fConf.get("mapred.tasktracker.dns.interface","default"),
+ fConf.get("mapred.tasktracker.dns.nameserver","default"));
+ }
+
+ final String dirs = localStorage.getDirsString();
+ fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
+ LOG.info("Good mapred local directories are: " + dirs);
+ taskController.setConf(fConf);
+ // Setup task controller so that deletion of user dirs happens properly
+ taskController.setup(localDirAllocator, localStorage);
+ server.setAttribute("conf", fConf);
+
+ deleteUserDirectories(fConf);
+
+ // NB: deleteLocalFiles uses the configured local dirs, but does not
+ // fail if a local directory has failed.
+ fConf.deleteLocalFiles(SUBDIR);
+ final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+ for (String s : localStorage.getDirs()) {
+ localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+ }
+ fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+ final FsPermission priv = FsPermission.createImmutable((short) 0700);
+ for (String s : localStorage.getDirs()) {
+ localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+ }
+ fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
+ final FsPermission pub = FsPermission.createImmutable((short) 0755);
+ for (String s : localStorage.getDirs()) {
+ localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
+ }
+ // Create userlogs directory under all good mapred-local-dirs
+ for (String s : localStorage.getDirs()) {
+ Path userLogsDir = new Path(s, TaskLog.USERLOGS_DIR_NAME);
+ if (!localFs.exists(userLogsDir)) {
+ localFs.mkdirs(userLogsDir, pub);
+ }
+ }
+ // Clear out state tables
+ this.tasks.clear();
+ this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ this.runningJobs = new TreeMap<JobID, RunningJob>();
+ this.mapTotal = 0;
+ this.reduceTotal = 0;
+ this.acceptNewTasks = true;
+ this.status = null;
+
+ this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
+ this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+ //tweak the probe sample size (make it a function of numCopiers)
+ probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
+
+ createInstrumentation();
+
+ // bind address
+ String address =
+ NetUtils.getServerAddress(fConf,
+ "mapred.task.tracker.report.bindAddress",
+ "mapred.task.tracker.report.port",
+ "mapred.task.tracker.report.address");
+ InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
+ String bindAddress = socAddr.getHostName();
+ int tmpPort = socAddr.getPort();
+
+ this.jvmManager = new JvmManager(this);
+
+ // Set service-level authorization security policy
+ if (this.fConf.getBoolean(
+ ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+ PolicyProvider policyProvider =
+ (PolicyProvider)(ReflectionUtils.newInstance(
+ this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
+ MapReducePolicyProvider.class, PolicyProvider.class),
+ this.fConf));
+ ServiceAuthorizationManager.refresh(fConf, policyProvider);
+ }
+
+ // RPC initialization
+ int max = maxMapSlots > maxReduceSlots ?
+ maxMapSlots : maxReduceSlots;
+ //set the num handlers to max*2 since canCommit may wait for the duration
+ //of a heartbeat RPC
+ this.taskReportServer = RPC.getServer(this, bindAddress,
+ tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
+ this.taskReportServer.start();
+
+ // get the assigned address
+ this.taskReportAddress = taskReportServer.getListenerAddress();
+ this.fConf.set("mapred.task.tracker.report.address",
+ taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
+ LOG.info("TaskTracker up at: " + this.taskReportAddress);
+
+ this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
+ LOG.info("Starting tracker " + taskTrackerName);
+
+ // Initialize DistributedCache
+ this.distributedCacheManager = new TrackerDistributedCacheManager(
+ this.fConf, taskController);
+ this.distributedCacheManager.startCleanupThread();
+
+ this.jobClient = (InterTrackerProtocol)
+ UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException {
+ return RPC.waitForProxy(InterTrackerProtocol.class,
+ InterTrackerProtocol.versionID,
+ jobTrackAddr, fConf);
+ }
+ });
+ this.justInited = true;
+ this.running = true;
+ // start the thread that will fetch map task completion events
+ this.mapEventsFetcher = new MapEventsFetcherThread();
+ mapEventsFetcher.setDaemon(true);
+ mapEventsFetcher.setName(
+ "Map-events fetcher for all reduce tasks " + "on " +
+ taskTrackerName);
+ mapEventsFetcher.start();
+
+ initializeMemoryManagement();
+
+ getUserLogManager().clearOldUserLogs(fConf);
+
+ setIndexCache(new IndexCache(this.fConf));
+
+ mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
+ reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
+ mapLauncher.start();
+ reduceLauncher.start();
+
+ // create a localizer instance
+ setLocalizer(new Localizer(localFs, localStorage.getDirs()));
+
+ //Start up node health checker service.
+ if (shouldStartHealthMonitor(this.fConf)) {
+ startHealthMonitor(this.fConf);
+ }
+
+ oobHeartbeatOnTaskCompletion =
+ fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+ }
+
+ private void createInstrumentation() {
+ Class<? extends TaskTrackerInstrumentation> metricsInst =
+ getInstrumentationClass(fConf);
+ LOG.debug("instrumentation class="+ metricsInst);
+ if (metricsInst == null) {
+ myInstrumentation = TaskTrackerInstrumentation.create(this);
+ return;
+ }
+ try {
+ java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class<?>[] {TaskTracker.class} );
+ this.myInstrumentation = c.newInstance(this);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize taskTracker metrics", e);
+ this.myInstrumentation = TaskTrackerInstrumentation.create(this);
+ }
+
+ }
+
+ UserGroupInformation getMROwner() {
+ return aclsManager.getMROwner();
+ }
+
+ /**
+ * Are ACLs for authorization checks enabled on the TT ?
+ */
+ boolean areACLsEnabled() {
+ return fConf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
+ }
+
+ static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
+ Configuration conf) {
+ return conf.getClass("mapred.tasktracker.instrumentation", null,
+ TaskTrackerInstrumentation.class);
+ }
+
+ static void setInstrumentationClass(
+ Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
+ conf.setClass("mapred.tasktracker.instrumentation",
+ t, TaskTrackerInstrumentation.class);
+ }
+
+ /**
+ * Removes all contents of temporary storage. Called upon
+ * startup, to remove any leftovers from previous run.
+ */
+ public void cleanupStorage() throws IOException {
+ this.fConf.deleteLocalFiles(SUBDIR);
+ this.fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+ this.fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
+ }
+
+ // Object on wait which MapEventsFetcherThread is going to wait.
+ private Object waitingOn = new Object();
+
+ private class MapEventsFetcherThread extends Thread {
+
+ private List <FetchStatus> reducesInShuffle() {
+ List <FetchStatus> fList = new ArrayList<FetchStatus>();
+ for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
+ RunningJob rjob = item.getValue();
+ if (!rjob.localized) {
+ continue;
+ }
+ JobID jobId = item.getKey();
+ FetchStatus f;
+ synchronized (rjob) {
+ f = rjob.getFetchStatus();
+ for (TaskInProgress tip : rjob.tasks) {
+ Task task = tip.getTask();
+ if (!task.isMapTask()) {
+ if (((ReduceTask)task).getPhase() ==
+ TaskStatus.Phase.SHUFFLE) {
+ if (rjob.getFetchStatus() == null) {
+ //this is a new job; we start fetching its map events
+ f = new FetchStatus(jobId,
+ ((ReduceTask)task).getNumMaps());
+ rjob.setFetchStatus(f);
+ }
+ f = rjob.getFetchStatus();
+ fList.add(f);
+ break; //no need to check any more tasks belonging to this
+ }
+ }
+ }
+ }
+ }
+ //at this point, we have information about for which of
+ //the running jobs do we need to query the jobtracker for map
+ //outputs (actually map events).
+ return fList;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Starting thread: " + this.getName());
+
+ while (running) {
+ try {
+ List <FetchStatus> fList = null;
+ synchronized (runningJobs) {
+ while (((fList = reducesInShuffle()).size()) == 0) {
+ try {
+ runningJobs.wait();
+ } catch (InterruptedException e) {
+ LOG.info("Shutting down: " + this.getName());
+ return;
+ }
+ }
+ }
+ // now fetch all the map task events for all the reduce tasks
+ // possibly belonging to different jobs
+ boolean fetchAgain = false; //flag signifying whether we want to fetch
+ //immediately again.
+ for (FetchStatus f : fList) {
+ long currentTime = System.currentTimeMillis();
+ try {
+ //the method below will return true when we have not
+ //fetched all available events yet
+ if (f.fetchMapCompletionEvents(currentTime)) {
+ fetchAgain = true;
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Ignoring exception that fetch for map completion" +
+ " events threw for " + f.jobId + " threw: " +
+ StringUtils.stringifyException(e));
+ }
+ if (!running) {
+ break;
+ }
+ }
+ synchronized (waitingOn) {
+ try {
+ if (!fetchAgain) {
+ waitingOn.wait(heartbeatInterval);
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Shutting down: " + this.getName());
+ return;
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Ignoring exception " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ private class FetchStatus {
+ /** The next event ID that we will start querying the JobTracker from*/
+ private IntWritable fromEventId;
+ /** This is the cache of map events for a given job */
+ private List<TaskCompletionEvent> allMapEvents;
+ /** What jobid this fetchstatus object is for*/
+ private JobID jobId;
+ private long lastFetchTime;
+ private boolean fetchAgain;
+
+ public FetchStatus(JobID jobId, int numMaps) {
+ this.fromEventId = new IntWritable(0);
+ this.jobId = jobId;
+ this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
+ }
+
+ /**
+ * Reset the events obtained so far.
+ */
+ public void reset() {
+ // Note that the sync is first on fromEventId and then on allMapEvents
+ synchronized (fromEventId) {
+ synchronized (allMapEvents) {
+ fromEventId.set(0); // set the new index for TCE
+ allMapEvents.clear();
+ }
+ }
+ }
+
+ public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
+
+ TaskCompletionEvent[] mapEvents =
+ TaskCompletionEvent.EMPTY_ARRAY;
+ boolean notifyFetcher = false;
+ synchronized (allMapEvents) {
+ if (allMapEvents.size() > fromId) {
+ int actualMax = Math.min(max, (allMapEvents.size() - fromId));
+ List <TaskCompletionEvent> eventSublist =
+ allMapEvents.subList(fromId, actualMax + fromId);
+ mapEvents = eventSublist.toArray(mapEvents);
+ } else {
+ // Notify Fetcher thread.
+ notifyFetcher = true;
+ }
+ }
+ if (notifyFetcher) {
+ synchronized (waitingOn) {
+ waitingOn.notify();
+ }
+ }
+ return mapEvents;
+ }
+
+ public boolean fetchMapCompletionEvents(long currTime) throws IOException {
+ if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) {
+ return false;
+ }
+ int currFromEventId = 0;
+ synchronized (fromEventId) {
+ currFromEventId = fromEventId.get();
+ List <TaskCompletionEvent> recentMapEvents =
+ queryJobTracker(fromEventId, jobId, jobClient);
+ synchronized (allMapEvents) {
+ allMapEvents.addAll(recentMapEvents);
+ }
+ lastFetchTime = currTime;
+ if (fromEventId.get() - currFromEventId >= probe_sample_size) {
+ //return true when we have fetched the full payload, indicating
+ //that we should fetch again immediately (there might be more to
+ //fetch
+ fetchAgain = true;
+ return true;
+ }
+ }
+ fetchAgain = false;
+ return false;
+ }
+ }
+
+ private static LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
+
+ // intialize the job directory
+ RunningJob localizeJob(TaskInProgress tip)
+ throws IOException, InterruptedException {
+ Task t = tip.getTask();
+ JobID jobId = t.getJobID();
+ RunningJob rjob = addTaskToJob(jobId, tip);
+ InetSocketAddress ttAddr = getTaskTrackerReportAddress();
+ try {
+ synchronized (rjob) {
+ if (!rjob.localized) {
+ while (rjob.localizing) {
+ rjob.wait();
+ }
+ if (!rjob.localized) {
+ //this thread is localizing the job
+ rjob.localizing = true;
+ }
+ }
+ }
+ if (!rjob.localized) {
+ Path localJobConfPath = initializeJob(t, rjob, ttAddr);
+ JobConf localJobConf = new JobConf(localJobConfPath);
+ //to be doubly sure, overwrite the user in the config with the one the TT
+ //thinks it is
+ localJobConf.setUser(t.getUser());
+ //also reset the #tasks per jvm
+ resetNumTasksPerJvm(localJobConf);
+ //set the base jobconf path in rjob; all tasks will use
+ //this as the base path when they run
+ synchronized (rjob) {
+ rjob.localizedJobConf = localJobConfPath;
+ rjob.jobConf = localJobConf;
+ rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+ localJobConf.getKeepFailedTaskFiles());
+
+ rjob.localized = true;
+ }
+ }
+ } finally {
+ synchronized (rjob) {
+ if (rjob.localizing) {
+ rjob.localizing = false;
+ rjob.notifyAll();
+ }
+ }
+ }
+ synchronized (runningJobs) {
+ runningJobs.notify(); //notify the fetcher thread
+ }
+ return rjob;
+ }
+
+ /**
+ * Localize the job on this tasktracker. Specifically
+ * <ul>
+ * <li>Cleanup and create job directories on all disks</li>
+ * <li>Download the credentials file</li>
+ * <li>Download the job config file job.xml from the FS</li>
+ * <li>Invokes the {@link TaskController} to do the rest of the job
+ * initialization</li>
+ * </ul>
+ *
+ * @param t task whose job has to be localized on this TT
+ * @param rjob the {@link RunningJob}
+ * @param ttAddr the tasktracker's RPC address
+ * @return the path to the job configuration to be used for all the tasks
+ * of this job as a starting point.
+ * @throws IOException
+ */
+ Path initializeJob(final Task t, final RunningJob rjob,
+ final InetSocketAddress ttAddr)
+ throws IOException, InterruptedException {
+ final JobID jobId = t.getJobID();
+
+ final Path jobFile = new Path(t.getJobFile());
+ final String userName = t.getUser();
+ final Configuration conf = getJobConf();
+
+ // save local copy of JobToken file
+ final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+ synchronized (rjob) {
+ rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
+
+ Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+ if (jt != null) { //could be null in the case of some unit tests
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
+ }
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+ rjob.ugi.addToken(token);
+ }
+ }
+
+ FileSystem userFs = getFS(jobFile, jobId, conf);
+
+ // Download the job.xml for this job from the system FS
+ final Path localJobFile =
+ localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
+
+ /**
+ * Now initialize the job via task-controller to do the rest of the
+ * job-init. Do this within a doAs since the public distributed cache
+ * is also set up here.
+ * To support potential authenticated HDFS accesses, we need the tokens
+ */
+ rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException, InterruptedException {
+ try {
+ final JobConf localJobConf = new JobConf(localJobFile);
+ // Setup the public distributed cache
+ TaskDistributedCacheManager taskDistributedCacheManager =
+ getTrackerDistributedCacheManager()
+ .newTaskDistributedCacheManager(jobId, localJobConf);
+ rjob.distCacheMgr = taskDistributedCacheManager;
+ taskDistributedCacheManager.setupCache(localJobConf,
+ TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+
+ // Set some config values
+ localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ if (conf.get("slave.host.name") != null) {
+ localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+ }
+ resetNumTasksPerJvm(localJobConf);
+ localJobConf.setUser(t.getUser());
+
+ // write back the config (this config will have the updates that the
+ // distributed cache manager makes as well)
+ JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+ taskController.initializeJob(t.getUser(), jobId.toString(),
+ new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+ ttAddr);
+ } catch (IOException e) {
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(e));
+ throw e;
+ } catch (InterruptedException ie) {
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(ie));
+ throw ie;
+ }
+ return null;
+ }
+ });
+ //search for the conf that the initializeJob created
+ //need to look up certain configs from this conf, like
+ //the distributed cache, profiling, etc. ones
+ Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+ userName, jobId.toString()), getJobConf());
+ return initializedConf;
+ }
+
+ /** If certain configs are enabled, the jvm-reuse should be disabled
+ * @param localJobConf
+ */
+ static void resetNumTasksPerJvm(JobConf localJobConf) {
+ boolean debugEnabled = false;
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ return;
+ }
+ if (localJobConf.getMapDebugScript() != null ||
+ localJobConf.getReduceDebugScript() != null) {
+ debugEnabled = true;
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+
+ if (debugEnabled || localJobConf.getProfileEnabled() ||
+ keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+ //disable jvm reuse
+ localJobConf.setNumTasksToExecutePerJvm(1);
+ }
+ }
+
+ // Remove the log dir from the tasklog cleanup thread
+ void saveLogDir(JobID jobId, JobConf localJobConf)
+ throws IOException {
+ // remove it from tasklog cleanup thread first,
+ // it might be added there because of tasktracker reinit or restart
+ JobStartedEvent jse = new JobStartedEvent(jobId);
+ getUserLogManager().addLogEvent(jse);
+ }
+
+
+ /**
+ * Download the job configuration file from the FS.
+ *
+ * @param jobFile the original location of the configuration file
+ * @param user the user in question
+ * @param userFs the FileSystem created on behalf of the user
+ * @param jobId jobid in question
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private Path localizeJobConfFile(Path jobFile, String user,
+ FileSystem userFs, JobID jobId)
+ throws IOException {
+ // Get sizes of JobFile and JarFile
+ // sizes are -1 if they are not present.
+ FileStatus status = null;
+ long jobFileSize = -1;
+ try {
+ status = userFs.getFileStatus(jobFile);
+ jobFileSize = status.getLen();
+ } catch(FileNotFoundException fe) {
+ jobFileSize = -1;
+ }
+ Path localJobFile =
+ lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,
+ jobId.toString()), jobFileSize, fConf);
+
+ // Download job.xml
+ userFs.copyToLocalFile(jobFile, localJobFile);
+ return localJobFile;
+ }
+
+ protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+ RunningJob rjob) throws IOException {
+ synchronized (tip) {
+ jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ localStorage.getDirsString());
+ tip.setJobConf(jobConf);
+ tip.setUGI(rjob.ugi);
+ tip.launchTask(rjob);
+ }
+ }
+
+ public synchronized void shutdown() throws IOException, InterruptedException {
+ shuttingDown = true;
+ close();
+ if (this.server != null) {
+ try {
+ LOG.info("Shutting down StatusHttpServer");
+ this.server.stop();
+ } catch (Exception e) {
+ LOG.warn("Exception shutting down TaskTracker", e);
+ }
+ }
+ }
+ /**
+ * Close down the TaskTracker and all its components. We must also shutdown
+ * any running tasks or threads, and cleanup disk space. A new TaskTracker
+ * within the same process space might be restarted, so everything must be
+ * clean.
+ * @throws InterruptedException
+ */
+ public synchronized void close() throws IOException, InterruptedException {
+ //
+ // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
+ // because calling jobHasFinished() may result in an edit to 'tasks'.
+ //
+ TreeMap<TaskAttemptID, TaskInProgress> tasksToClose =
+ new TreeMap<TaskAttemptID, TaskInProgress>();
+ tasksToClose.putAll(tasks);
+ for (TaskInProgress tip : tasksToClose.values()) {
+ tip.jobHasFinished(false);
+ }
+
+ this.running = false;
+
+ // Clear local storage
+ cleanupStorage();
+
+ // Shutdown the fetcher thread
+ this.mapEventsFetcher.interrupt();
+
+ //stop the launchers
+ this.mapLauncher.interrupt();
+ this.reduceLauncher.interrupt();
+
+ this.distributedCacheManager.stopCleanupThread();
+ jvmManager.stop();
+
+ // shutdown RPC connections
+ RPC.stopProxy(jobClient);
+
+ // wait for the fetcher thread to exit
+ for (boolean done = false; !done; ) {
+ try {
+ this.mapEventsFetcher.join();
+ done = true;
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (taskReportServer != null) {
+ taskReportServer.stop();
+ taskReportServer = null;
+ }
+ if (healthChecker != null) {
+ //stop node health checker service
+ healthChecker.stop();
+ healthChecker = null;
+ }
+ }
+
+ /**
+ * For testing
+ */
+ TaskTracker() {
+ server = null;
+ workerThreads = 0;
+ mapRetainSize = TaskLogsTruncater.DEFAULT_RETAIN_SIZE;
+ reduceRetainSize = TaskLogsTruncater.DEFAULT_RETAIN_SIZE;
+ }
+
+ void setConf(JobConf conf) {
+ fConf = conf;
+ }
+
+ /**
+ * Start with the local machine name, and the default JobTracker
+ */
+ public TaskTracker(JobConf conf) throws IOException, InterruptedException {
+ originalConf = conf;
+ FILE_CACHE_SIZE = conf.getInt("mapred.tasktracker.file.cache.size", 2000);
+ maxMapSlots = conf.getInt(
+ "mapred.tasktracker.map.tasks.maximum", 2);
+ maxReduceSlots = conf.getInt(
+ "mapred.tasktracker.reduce.tasks.maximum", 2);
+ diskHealthCheckInterval = conf.getLong(DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
+ DEFAULT_DISK_HEALTH_CHECK_INTERVAL);
+ UserGroupInformation.setConfiguration(originalConf);
+ aclsManager = new ACLsManager(conf, new JobACLsManager(conf), null);
+ this.jobTrackAddr = JobTracker.getAddress(conf);
+ String infoAddr =
+ NetUtils.getServerAddress(conf,
+ "tasktracker.http.bindAddress",
+ "tasktracker.http.port",
+ "mapred.task.tracker.http.address");
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+ String httpBindAddress = infoSocAddr.getHostName();
+ int httpPort = infoSocAddr.getPort();
+ this.server = new HttpServer("task", httpBindAddress, httpPort,
+ httpPort == 0, conf, aclsManager.getAdminsAcl());
+ workerThreads = conf.getInt("tasktracker.http.threads", 40);
+ server.setThreads(1, workerThreads);
+ // let the jsp pages get to the task tracker, config, and other relevant
+ // objects
+ FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ Class<? extends TaskController> taskControllerClass =
+ conf.getClass("mapred.task.tracker.task-controller",
+ DefaultTaskController.class, TaskController.class);
+
+ fConf = new JobConf(conf);
+ localStorage = new LocalStorage(fConf.getLocalDirs());
+ localStorage.checkDirs();
+ taskController =
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
+ taskController.setup(localDirAllocator, localStorage);
+ lastNumFailures = localStorage.numFailures();
+
+ // create user log manager
+ setUserLogManager(new UserLogManager(conf, taskController));
+ SecurityUtil.login(originalConf, TT_KEYTAB_FILE, TT_USER_NAME);
+
+ initialize();
+ this.shuffleServerMetrics = ShuffleServerInstrumentation.create(this);
+ server.setAttribute("task.tracker", this);
+ server.setAttribute("local.file.system", local);
+
+ server.setAttribute("log", LOG);
+ server.setAttribute("localDirAllocator", localDirAllocator);
+ server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+
+ String exceptionStackRegex =
+ conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
+ String exceptionMsgRegex =
+ conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+
+ server.setAttribute("exceptionStackRegex", exceptionStackRegex);
+ server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+
+ server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+ server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
+ server.start();
+ this.httpPort = server.getPort();
+ checkJettyPort(httpPort);
+ LOG.info("FILE_CACHE_SIZE for mapOutputServlet set to : " + FILE_CACHE_SIZE);
+ mapRetainSize = conf.getLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE,
+ TaskLogsTruncater.DEFAULT_RETAIN_SIZE);
+ reduceRetainSize = conf.getLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE,
+ TaskLogsTruncater.DEFAULT_RETAIN_SIZE);
+ }
+
+ private void checkJettyPort(int port) throws IOException {
+ //See HADOOP-4744
+ if (port < 0) {
+ shuttingDown = true;
+ throw new IOException("Jetty problem. Jetty didn't bind to a " +
+ "valid port");
+ }
+ }
+
+ private void startCleanupThreads() throws IOException {
+ taskCleanupThread.setDaemon(true);
+ taskCleanupThread.start();
+ directoryCleanupThread = CleanupQueue.getInstance();
+ }
+
+ // only used by tests
+ void setCleanupThread(CleanupQueue c) {
+ directoryCleanupThread = c;
+ }
+
+ CleanupQueue getCleanupThread() {
+ return directoryCleanupThread;
+ }
+
+ /**
+ * The connection to the JobTracker, used by the TaskRunner
+ * for locating remote files.
+ */
+ public InterTrackerProtocol getJobClient() {
+ return jobClient;
+ }
+
+ /** Return the port at which the tasktracker bound to */
+ public synchronized InetSocketAddress getTaskTrackerReportAddress() {
+ return taskReportAddress;
+ }
+
+ /** Queries the job tracker for a set of outputs ready to be copied
+ * @param fromEventId the first event ID we want to start from, this is
+ * modified by the call to this method
+ * @param jobClient the job tracker
+ * @return a set of locations to copy outputs from
+ * @throws IOException
+ */
+ private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
+ JobID jobId,
+ InterTrackerProtocol jobClient)
+ throws IOException {
+
+ TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+ jobId,
+ fromEventId.get(),
+ probe_sample_size);
+ //we are interested in map task completion events only. So store
+ //only those
+ List <TaskCompletionEvent> recentMapEvents =
+ new ArrayList<TaskCompletionEvent>();
+ for (int i = 0; i < t.length; i++) {
+ if (t[i].isMap) {
+ recentMapEvents.add(t[i]);
+ }
+ }
+ fromEventId.set(fromEventId.get() + t.length);
+ return recentMapEvents;
+ }
+
+ /**
+ * Main service loop. Will stay in this loop forever.
+ */
+ State offerService() throws Exception {
+ long lastHeartbeat = 0;
+
+ while (running && !shuttingDown) {
+ try {
+ long now = System.currentTimeMillis();
+
+ long waitTime = heartbeatInterval - (now - lastHeartbeat);
+ if (waitTime > 0) {
+ // sleeps for the wait time or
+ // until there are empty slots to schedule tasks
+ synchronized (finishedCount) {
+ if (finishedCount.get() == 0) {
+ finishedCount.wait(waitTime);
+ }
+ finishedCount.set(0);
+ }
+ }
+
+ // If the TaskTracker is just starting up:
+ // 1. Verify the buildVersion
+ // 2. Get the system directory & filesystem
+ if(justInited) {
+ String jobTrackerBV = jobClient.getBuildVersion();
+ if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
+ String msg = "Shutting down. Incompatible buildVersion." +
+ "\nJobTracker's: " + jobTrackerBV +
+ "\nTaskTracker's: "+ VersionInfo.getBuildVersion();
+ LOG.error(msg);
+ try {
+ jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
+ } catch(Exception e ) {
+ LOG.info("Problem reporting to jobtracker: " + e);
+ }
+ return State.DENIED;
+ }
+
+ String dir = jobClient.getSystemDir();
+ if (dir == null) {
+ throw new IOException("Failed to get system directory");
+ }
+ systemDirectory = new Path(dir);
+ systemFS = systemDirectory.getFileSystem(fConf);
+ }
+
+ now = System.currentTimeMillis();
+ if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
+ localStorage.checkDirs();
+ lastCheckDirsTime = now;
+ int numFailures = localStorage.numFailures();
+ // Re-init the task tracker if there were any new failures
+ if (numFailures > lastNumFailures) {
+ lastNumFailures = numFailures;
+ return State.STALE;
+ }
+ }
+
+ // Send the heartbeat and process the jobtracker's directives
+ HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+ // Note the time when the heartbeat returned, use this to decide when to send the
+ // next heartbeat
+ lastHeartbeat = System.currentTimeMillis();
+
+ // Check if the map-event list needs purging
+ Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
+ if (jobs.size() > 0) {
+ synchronized (this) {
+ // purge the local map events list
+ for (JobID job : jobs) {
+ RunningJob rjob;
+ synchronized (runningJobs) {
+ rjob = runningJobs.get(job);
+ if (rjob != null) {
+ synchronized (rjob) {
+ FetchStatus f = rjob.getFetchStatus();
+ if (f != null) {
+ f.reset();
+ }
+ }
+ }
+ }
+ }
+
+ // Mark the reducers in shuffle for rollback
+ synchronized (shouldReset) {
+ for (Map.Entry<TaskAttemptID, TaskInProgress> entry
+ : runningTasks.entrySet()) {
+ if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
+ this.shouldReset.add(entry.getKey());
+ }
+ }
+ }
+ }
+ }
+
+ TaskTrackerAction[] actions = heartbeatResponse.getActions();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
+ heartbeatResponse.getResponseId() + " and " +
+ ((actions != null) ? actions.length : 0) + " actions");
+ }
+ if (reinitTaskTracker(actions)) {
+ return State.STALE;
+ }
+
+ // resetting heartbeat interval from the response.
+ heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
+ justStarted = false;
+ justInited = false;
+ if (actions != null){
+ for(TaskTrackerAction action: actions) {
+ if (action instanceof LaunchTaskAction) {
+ addToTaskQueue((LaunchTaskAction)action);
+ } else if (action instanceof CommitTaskAction) {
+ CommitTaskAction commitAction = (CommitTaskAction)action;
+ if (!commitResponses.contains(commitAction.getTaskID())) {
+ LOG.info("Received commit task action for " +
+ commitAction.getTaskID());
+ commitResponses.add(commitAction.getTaskID());
+ }
+ } else {
+ tasksToCleanup.put(action);
+ }
+ }
+ }
+ markUnresponsiveTasks();
+ killOverflowingTasks();
+
+ //we've cleaned up, resume normal operation
+ if (!acceptNewTasks && isIdle()) {
+ acceptNewTasks=true;
+ }
+ //The check below may not be required every iteration but we are
+ //erring on the side of caution here. We have seen many cases where
+ //the call to jetty's getLocalPort() returns different values at
+ //different times. Being a real paranoid here.
+ checkJettyPort(server.getPort());
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted. Closing down.");
+ return State.INTERRUPTED;
+ } catch (DiskErrorException de) {
+ String msg = "Exiting task tracker for disk error:\n" +
+ StringUtils.stringifyException(de);
+ LOG.error(msg);
+ synchronized (this) {
+ jobClient.reportTaskTrackerError(taskTrackerName,
+ "DiskErrorException", msg);
+ }
+ return State.STALE;
+ } catch (RemoteException re) {
+ String reClass = re.getClassName();
+ if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
+ LOG.info("Tasktracker disallowed by JobTracker.");
+ return State.DENIED;
+ }
+ } catch (Exception except) {
+ String msg = "Caught exception: " +
+ StringUtils.stringifyException(except);
+ LOG.error(msg);
+ }
+ }
+
+ return State.NORMAL;
+ }
+
+ private long previousUpdate = 0;
+
+ void setIndexCache(IndexCache cache) {
+ this.indexCache = cache;
+ }
+
+ /**
+ * Build and transmit the heart beat to the JobTracker
+ * @param now current time
+ * @return false if the tracker was unknown
+ * @throws IOException
+ */
+ HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+ // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
+ boolean sendCounters;
+ if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
+ sendCounters = true;
+ previousUpdate = now;
+ }
+ else {
+ sendCounters = false;
+ }
+
+ //
+ // Check if the last heartbeat got through...
+ // if so then build the heartbeat information for the JobTracker;
+ // else resend the previous status information.
+ //
+ if (status == null) {
+ synchronized (this) {
+ status = new TaskTrackerStatus(taskTrackerName, localHostname,
+ httpPort,
+ cloneAndResetRunningTaskStatuses(
+ sendCounters),
+ failures,
+ maxMapSlots,
+ maxReduceSlots);
+ }
+ } else {
+ LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
+ "' with reponseId '" + heartbeatResponseId);
+ }
+
+ //
+ // Check if we should ask for a new Task
+ //
+ boolean askForNewTask;
+ long localMinSpaceStart;
+ synchronized (this) {
+ askForNewTask =
+ ((status.countOccupiedMapSlots() < maxMapSlots ||
+ status.countOccupiedReduceSlots() < maxReduceSlots) &&
+ acceptNewTasks);
+ localMinSpaceStart = minSpaceStart;
+ }
+ if (askForNewTask) {
+ askForNewTask = enoughFreeSpace(localMinSpaceStart);
+ long freeDiskSpace = getFreeSpace();
+ long totVmem = getTotalVirtualMemoryOnTT();
+ long totPmem = getTotalPhysicalMemoryOnTT();
+
+ status.getResourceStatus().setAvailableSpace(freeDiskSpace);
+ status.getResourceStatus().setTotalVirtualMemory(totVmem);
+ status.getResourceStatus().setTotalPhysicalMemory(totPmem);
+ status.getResourceStatus().setMapSlotMemorySizeOnTT(
+ mapSlotMemorySizeOnTT);
+ status.getResourceStatus().setReduceSlotMemorySizeOnTT(
+ reduceSlotSizeMemoryOnTT);
+ }
+ //add node health information
+
+ TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+ synchronized (this) {
+ if (healthChecker != null) {
+ healthChecker.setHealthStatus(healthStatus);
+ } else {
+ healthStatus.setNodeHealthy(true);
+ healthStatus.setLastReported(0L);
+ healthStatus.setHealthReport("");
+ }
+ }
+ //
+ // Xmit the heartbeat
+ //
+ HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
+ justStarted,
+ justInited,
+ askForNewTask,
+ heartbeatResponseId);
+
+ //
+ // The heartbeat got through successfully!
+ //
+ heartbeatResponseId = heartbeatResponse.getResponseId();
+
+ synchronized (this) {
+ for (TaskStatus taskStatus : status.getTaskReports()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+ taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !taskStatus.inTaskCleanupPhase()) {
+ if (taskStatus.getIsMap()) {
+ mapTotal--;
+ } else {
+ reduceTotal--;
+ }
+ myInstrumentation.completeTask(taskStatus.getTaskID());
+ runningTasks.remove(taskStatus.getTaskID());
+ }
+ }
+
+ // Clear transient status information which should only
+ // be sent once to the JobTracker
+ for (TaskInProgress tip: runningTasks.values()) {
+ tip.getStatus().clearStatus();
+ }
+ }
+
+ // Force a rebuild of 'status' on the next iteration
+ status = null;
+
+ return heartbeatResponse;
+ }
+
+ /**
+ * Return the total virtual memory available on this TaskTracker.
+ * @return total size of virtual memory.
+ */
+ long getTotalVirtualMemoryOnTT() {
+ return totalVirtualMemoryOnTT;
+ }
+
+ /**
+ * Return the total physical memory available on this TaskTracker.
+ * @return total size of physical memory.
+ */
+ long getTotalPhysicalMemoryOnTT() {
+ return totalPhysicalMemoryOnTT;
+ }
+
+ long getTotalMemoryAllottedForTasksOnTT() {
+ return totalMemoryAllottedForTasks;
+ }
+
+ long getRetainSize(org.apache.hadoop.mapreduce.TaskAttemptID tid) {
+ return tid.isMap() ? mapRetainSize : reduceRetainSize;
+ }
+
+ /**
+ * Check if the jobtracker directed a 'reset' of the tasktracker.
+ *
+ * @param actions the directives of the jobtracker for the tasktracker.
+ * @return <code>true</code> if tasktracker is to be reset,
+ * <code>false</code> otherwise.
+ */
+ private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
+ if (actions != null) {
+ for (TaskTrackerAction action : actions) {
+ if (action.getActionId() ==
+ TaskTrackerAction.ActionType.REINIT_TRACKER) {
+ LOG.info("Recieved ReinitTrackerAction from JobTracker");
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Kill any tasks that have not reported progress in the last X seconds.
+ */
+ private synchronized void markUnresponsiveTasks() throws IOException {
+ long now = System.currentTimeMillis();
+ for (TaskInProgress tip: runningTasks.values()) {
+ if (tip.getRunState() == TaskStatus.State.RUNNING ||
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ tip.isCleaningup()) {
+ // Check the per-job timeout interval for tasks;
+ // an interval of '0' implies it is never timed-out
+ long jobTaskTimeout = tip.getTaskTimeout();
+ if (jobTaskTimeout == 0) {
+ continue;
+ }
+
+ // Check if the task has not reported progress for a
+ // time-period greater than the configured time-out
+ long timeSinceLastReport = now - tip.getLastProgressReport();
+ if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
+ String msg =
+ "Task " + tip.getTask().getTaskID() + " failed to report status for "
+ + (timeSinceLastReport / 1000) + " seconds. Killing!";
+ LOG.info(tip.getTask().getTaskID() + ": " + msg);
+ ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
+ tip.reportDiagnosticInfo(msg);
+ myInstrumentation.timedoutTask(tip.getTask().getTaskID());
+ purgeTask(tip, true);
+ }
+ }
+ }
+ }
+
+ /**
+ * The task tracker is done with this job, so we need to clean up.
+ * @param action The action with the job
+ * @throws IOException
+ */
+ synchronized void purgeJob(KillJobAction action) throws IOException {
+ JobID jobId = action.getJobID();
+ LOG.info("Received 'KillJobAction' for job: " + jobId);
+ RunningJob rjob = null;
+ synchronized (runningJobs) {
+ rjob = runningJobs.get(jobId);
+ }
+
+ if (rjob == null) {
+ LOG.warn("Unknown job " + jobId + " being deleted.");
+ } else {
+ synchronized (rjob) {
+ // decrement the reference counts for the items this job references
+ rjob.distCacheMgr.release();
+ // Add this tips of this job to queue of tasks to be purged
+ for (TaskInProgress tip : rjob.tasks) {
+ tip.jobHasFinished(false);
+ Task t = tip.getTask();
+ if (t.isMapTask()) {
+ indexCache.removeMap(tip.getTask().getTaskID().toString());
+ }
+ }
+ // Delete the job directory for this
+ // task if the job is done/failed
+ if (!rjob.keepJobFiles) {
+ removeJobFiles(rjob.ugi.getShortUserName(), rjob.getJobID());
+ }
+ // add job to user log manager
+ long now = System.currentTimeMillis();
+ JobCompletedEvent jca = new JobCompletedEvent(rjob
+ .getJobID(), now, UserLogCleaner.getUserlogRetainHours(rjob
+ .getJobConf()));
+ getUserLogManager().addLogEvent(jca);
+
+ // Remove this job
+ rjob.tasks.clear();
+ // Close all FileSystems for this job
+ try {
+ FileSystem.closeAllForUGI(rjob.getUGI());
+ } catch (IOException ie) {
+ LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) +
+ " while closing FileSystem for " + rjob.getUGI());
+ }
+ }
+ }
+
+ synchronized(runningJobs) {
+ runningJobs.remove(jobId);
+ }
+ getJobTokenSecretManager().removeTokenForJob(jobId.toString());
+ }
+
+ /**
+ * This job's files are no longer needed on this TT, remove them.
+ *
+ * @param rjob
+ * @throws IOException
+ */
+ void removeJobFiles(String user, JobID jobId) throws IOException {
+ String userDir = getUserDir(user);
+ String jobDir = getLocalJobDir(user, jobId.toString());
+ PathDeletionContext jobCleanup =
+ new TaskController.DeletionContext(getTaskController(), false, user,
+ jobDir.substring(userDir.length()));
+ directoryCleanupThread.addToQueue(jobCleanup);
+
+ for (String str : localStorage.getDirs()) {
+ Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+ new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+ PathDeletionContext ttPrivateJobCleanup =
+ new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+ directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+ }
+ }
+
+ /**
+ * Remove the tip and update all relevant state.
+ *
+ * @param tip {@link TaskInProgress} to be removed.
+ * @param wasFailure did the task fail or was it killed?
+ */
+ private void purgeTask(TaskInProgress tip, boolean wasFailure)
+ throws IOException {
+ if (tip != null) {
+ LOG.info("About to purge task: " + tip.getTask().getTaskID());
+
+ // Remove the task from running jobs,
+ // removing the job if it's the last task
+ removeTaskFromJob(tip.getTask().getJobID(), tip);
+ tip.jobHasFinished(wasFailure);
+ if (tip.getTask().isMapTask()) {
+ indexCache.removeMap(tip.getTask().getTaskID().toString());
+ }
+ }
+ }
+
+ /** Check if we're dangerously low on disk space
+ * If so, kill jobs to free up space and make sure
+ * we don't accept any new tasks
+ * Try killing the reduce jobs first, since I believe they
+ * use up most space
+ * Then pick the one with least progress
+ */
+ private void killOverflowingTasks() throws IOException {
+ long localMinSpaceKill;
+ synchronized(this){
+ localMinSpaceKill = minSpaceKill;
+ }
+ if (!enoughFreeSpace(localMinSpaceKill)) {
+ acceptNewTasks=false;
+ //we give up! do not accept new tasks until
+ //all the ones running have finished and they're all cleared up
+ synchronized (this) {
+ TaskInProgress killMe = findTaskToKill(null);
+
+ if (killMe!=null) {
+ String msg = "Tasktracker running out of space." +
+ " Killing task.";
+ LOG.info(killMe.getTask().getTaskID() + ": " + msg);
+ killMe.reportDiagnosticInfo(msg);
+ purgeTask(killMe, false);
+ }
+ }
+ }
+ }
+
+ /**
+ * Pick a task to kill to free up memory/disk-space
+ * @param tasksToExclude tasks that are to be excluded while trying to find a
+ * task to kill. If null, all runningTasks will be searched.
+ * @return the task to kill or null, if one wasn't found
+ */
+ synchronized TaskInProgress findTaskToKill(List<TaskAttemptID> tasksToExclude) {
+ TaskInProgress killMe = null;
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+
+ if (tasksToExclude != null
+ && tasksToExclude.contains(tip.getTask().getTaskID())) {
+ // exclude this task
+ continue;
+ }
+
+ if ((tip.getRunState() == TaskStatus.State.RUNNING ||
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
+ !tip.wasKilled) {
+
+ if (killMe == null) {
+ killMe = tip;
+
+ } else if (!tip.getTask().isMapTask()) {
+ //reduce task, give priority
+ if (killMe.getTask().isMapTask() ||
+ (tip.getTask().getProgress().get() <
+ killMe.getTask().getProgress().get())) {
+
+ killMe = tip;
+ }
+
+ } else if (killMe.getTask().isMapTask() &&
+ tip.getTask().getProgress().get() <
+ killMe.getTask().getProgress().get()) {
+ //map task, only add if the progress is lower
+
+ killMe = tip;
+ }
+ }
+ }
+ return killMe;
+ }
+
+ /**
+ * Check if any of the local directories has enough
+ * free space (more than minSpace)
+ *
+ * If not, do not try to get a new task assigned
+ * @return
+ * @throws IOException
+ */
+ private boolean enoughFreeSpace(long minSpace) throws IOException {
+ if (minSpace == 0) {
+ return true;
+ }
+ return minSpace < getFreeSpace();
+ }
+
+ private long getFreeSpace() throws IOException {
+ long biggestSeenSoFar = 0;
+ String[] localDirs = localStorage.getDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ DF df = null;
+ if (localDirsDf.containsKey(localDirs[i])) {
+ df = localDirsDf.get(localDirs[i]);
+ } else {
+ df = new DF(new File(localDirs[i]), fConf);
+ localDirsDf.put(localDirs[i], df);
+ }
+
+ long availOnThisVol = df.getAvailable();
+ if (availOnThisVol > biggestSeenSoFar) {
+ biggestSeenSoFar = availOnThisVol;
+ }
+ }
+
+ //Should ultimately hold back the space we expect running tasks to use but
+ //that estimate isn't currently being passed down to the TaskTrackers
+ return biggestSeenSoFar;
+ }
+
+ private TaskLauncher mapLauncher;
+ private TaskLauncher reduceLauncher;
+ public JvmManager getJvmManagerInstance() {
+ return jvmManager;
+ }
+
+ // called from unit test
+ void setJvmManagerInstance(JvmManager jvmManager) {
+ this.jvmManager = jvmManager;
+ }
+
+ private void addToTaskQueue(LaunchTaskAction action) {
+ if (action.getTask().isMapTask()) {
+ mapLauncher.addToTaskQueue(action);
+ } else {
+ reduceLauncher.addToTaskQueue(action);
+ }
+ }
+
+ class TaskLauncher extends Thread {
+ private IntWritable numFreeSlots;
+ private final int maxSlots;
+ private List<TaskInProgress> tasksToLaunch;
+
+ public TaskLauncher(TaskType taskType, int numSlots) {
+ this.maxSlots = numSlots;
+ this.numFreeSlots = new IntWritable(numSlots);
+ this.tasksToLaunch = new LinkedList<TaskInProgress>();
+ setDaemon(true);
+ setName("TaskLauncher for " + taskType + " tasks");
+ }
+
+ public void addToTaskQueue(LaunchTaskAction action) {
+ synchronized (tasksToLaunch) {
+ TaskInProgress tip = registerTask(action, this);
+ tasksToLaunch.add(tip);
+ tasksToLaunch.notifyAll();
+ }
+ }
+
+ public void cleanTaskQueue() {
+ tasksToLaunch.clear();
+ }
+
+ public void addFreeSlots(int numSlots) {
+ synchronized (numFreeSlots) {
+ numFreeSlots.set(numFreeSlots.get() + numSlots);
+ assert (numFreeSlots.get() <= maxSlots);
+ LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
+ numFreeSlots.notifyAll();
+ }
+ }
+
+ void notifySlots() {
+ synchronized (numFreeSlots) {
+ numFreeSlots.notifyAll();
+ }
+ }
+
+ int getNumWaitingTasksToLaunch() {
+ synchronized (tasksToLaunch) {
+ return tasksToLaunch.size();
+ }
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ TaskInProgress tip;
+ Task task;
+ synchronized (tasksToLaunch) {
+ while (tasksToLaunch.isEmpty()) {
+ tasksToLaunch.wait();
+ }
+ //get the TIP
+ tip = tasksToLaunch.remove(0);
+ task = tip.getTask();
+ LOG.info("Trying to launch : " + tip.getTask().getTaskID() +
+ " which needs " + task.getNumSlotsRequired() + " slots");
+ }
+ //wait for free slots to run
+ synchronized (numFreeSlots) {
+ boolean canLaunch = true;
+ while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+ //Make sure that there is no kill task action for this task!
+ //We are not locking tip here, because it would reverse the
+ //locking order!
+ //Also, Lock for the tip is not required here! because :
+ // 1. runState of TaskStatus is volatile
+ // 2. Any notification is not missed because notification is
+ // synchronized on numFreeSlots. So, while we are doing the check,
+ // if the tip is half way through the kill(), we don't miss
+ // notification for the following wait().
+ if (!tip.canBeLaunched()) {
+ //got killed externally while still in the launcher queue
+ LOG.info("Not blocking slots for " + task.getTaskID()
+ + " as it got killed externally. Task's state is "
+ + tip.getRunState());
+ canLaunch = false;
+ break;
+ }
+ LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() +
+ " to launch " + task.getTaskID() + ", currently we have " +
+ numFreeSlots.get() + " free slots");
+ numFreeSlots.wait();
+ }
+ if (!canLaunch) {
+ continue;
+ }
+ LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
+ " and trying to launch "+tip.getTask().getTaskID() +
+ " which needs " + task.getNumSlotsRequired() + " slots");
+ numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());
+ assert (numFreeSlots.get() >= 0);
+ }
+ synchronized (tip) {
+ //to make sure that there is no kill task action for this
+ if (!tip.canBeLaunched()) {
+ //got killed externally while still in the launcher queue
+ LOG.info("Not launching task " + task.getTaskID() + " as it got"
+ + " killed externally. Task's state is " + tip.getRunState());
+ addFreeSlots(task.getNumSlotsRequired());
+ continue;
+ }
+ tip.slotTaken = true;
+ }
+ //got a free slot. launch the task
+ startNewTask(tip);
+ } catch (InterruptedException e) {
+ return; // ALL DONE
+ } catch (Throwable th) {
+ LOG.error("TaskLauncher error " +
+ StringUtils.stringifyException(th));
+ }
+ }
+ }
+ }
+ private TaskInProgress registerTask(LaunchTaskAction action,
+ TaskLauncher launcher) {
+ Task t = action.getTask();
+ LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+ " task's state:" + t.getState());
+ TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
+ synchronized (this) {
+ tasks.put(t.getTaskID(), tip);
+ runningTasks.put(t.getTaskID(), tip);
+ boolean isMap = t.isMapTask();
+ if (isMap) {
+ mapTotal++;
+ } else {
+ reduceTotal++;
+ }
+ }
+ return tip;
+ }
+
+ /**
+ * Start a new task.
+ * All exceptions are handled locally, so that we don't mess up the
+ * task tracker.
+ * @throws InterruptedException
+ */
+ void startNewTask(final TaskInProgress tip) throws InterruptedException {
+ Thread launchThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RunningJob rjob = localizeJob(tip);
+ tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
+ // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
+ launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);
+ } catch (Throwable e) {
+ String msg = ("Error initializing " + tip.getTask().getTaskID() +
+ ":\n" + StringUtils.stringifyException(e));
+ LOG.warn(msg);
+ tip.reportDiagnosticInfo(msg);
+ try {
+ tip.kill(true);
+ tip.cleanup(true);
+ } catch (IOException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ } catch (InterruptedException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ }
+ if (e instanceof Error) {
+ LOG.error("TaskLauncher error " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ });
+ launchThread.start();
+ }
+
+ void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
+ JobConf conf) {
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.addTask(attemptId,
+ isMap ? conf
+ .getMemoryForMapTask() * 1024 * 1024L : conf
+ .getMemoryForReduceTask() * 1024 * 1024L);
+ }
+ }
+
+ void removeFromMemoryManager(TaskAttemptID attemptId) {
+ // Remove the entry from taskMemoryManagerThread's data structures.
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.removeTask(attemptId);
+ }
+ }
+
+ /**
+ * Notify the tasktracker to send an out-of-band heartbeat.
+ */
+ private void notifyTTAboutTaskCompletion() {
+ if (oobHeartbeatOnTaskCompletion) {
+ synchronized (finishedCount) {
+ int value = finishedCount.get();
+ finishedCount.set(value+1);
+ finishedCount.notify();
+ }
+ }
+ }
+
+ /**
+ * The server retry loop.
+ * This while-loop attempts to connect to the JobTracker. It only
+ * loops when the old TaskTracker has gone bad (its state is
+ * stale somehow) and we need to reinitialize everything.
+ */
+ public void run() {
+ try {
+ getUserLogManager().start();
+ startCleanupThreads();
+ boolean denied = false;
+ while (running && !shuttingDown && !denied) {
+ boolean staleState = false;
+ try {
+ // This while-loop attempts reconnects if we get network errors
+ while (running && !staleState && !shuttingDown && !denied) {
+ try {
+ State osState = offerService();
+ if (osState == State.STALE) {
+ staleState = true;
+ } else if (osState == State.DENIED) {
+ denied = true;
+ }
+ } catch (Exception ex) {
+ if (!shuttingDown) {
+ LOG.info("Lost connection to JobTracker [" +
+ jobTrackAddr + "]. Retrying...", ex);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+ } finally {
+ close();
+ }
+ if (shuttingDown) { return; }
+ LOG.warn("Reinitializing local state");
+ initialize();
+ }
+ if (denied) {
+ shutdown();
+ }
+ } catch (IOException iex) {
+ LOG.error("Got fatal exception while reinitializing TaskTracker: " +
+ StringUtils.stringifyException(iex));
+ return;
+ }
+ catch (InterruptedException i) {
+ LOG.error("Got interrupted while reinitializing TaskTracker: " +
+ i.getMessage());
+ return;
+ }
+ }
+
+ ///////////////////////////////////////////////////////
+ // TaskInProgress maintains all the info for a Task that
+ // lives at this TaskTracker. It maintains the Task object,
+ // its TaskStatus, and the TaskRunner.
+ ///////////////////////////////////////////////////////
+ class TaskInProgress {
+ Task task;
+ long lastProgressReport;
+ StringBuffer diagnosticInfo = new StringBuffer();
+ private TaskRunner runner;
+ volatile boolean done = false;
+ volatile boolean wasKilled = false;
+ private JobConf ttConf;
+ private JobConf localJobConf;
+ private boolean keepFailedTaskFiles;
+ private boolean alwaysKeepTaskFiles;
+ private TaskStatus taskStatus;
+ private long taskTimeout;
+ private String debugCommand;
+ private volatile boolean slotTaken = false;
+ private TaskLauncher launcher;
+
+ // The ugi of the user who is running the job. This contains all the tokens
+ // too which will be populated during job-localization
+ private UserGroupInformation ugi;
+
+ UserGroupInformation getUGI() {
+ return ugi;
+ }
+
+ void setUGI(UserGroupInformation userUGI) {
+ ugi = userUGI;
+ }
+
+ /**
+ */
+ public TaskInProgress(Task task, JobConf conf) {
+ this(task, conf, null);
+ }
+
+ public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) {
+ this.task = task;
+ this.launcher = launcher;
+ this.lastProgressReport = System.currentTimeMillis();
+ this.ttConf = conf;
+ localJobConf = null;
+ taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
+ 0.0f,
+ task.getNumSlotsRequired(),
+ task.getState(),
+ diagnosticInfo.toString(),
+ "initializing",
+ getName(),
+ task.isTaskCleanupTask() ?
+ TaskStatus.Phase.CLEANUP :
+ task.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.SHUFFLE,
+ task.getCounters());
+ taskTimeout = (10 * 60 * 1000);
+ }
+
+ void localizeTask(Task task) throws IOException{
+
+ // Do the task-type specific localization
+//TODO: are these calls really required
+ task.localizeConfiguration(localJobConf);
+
+ task.setConf(localJobConf);
+ }
+
+ /**
+ */
+ public Task getTask() {
+ return task;
+ }
+
+ TaskRunner getTaskRunner() {
+ return runner;
+ }
+
+ void setTaskRunner(TaskRunner rnr) {
+ this.runner = rnr;
+ }
+
+ public synchronized void setJobConf(JobConf lconf){
+ this.localJobConf = lconf;
+ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ taskTimeout = localJobConf.getLong("mapred.task.timeout",
+ 10 * 60 * 1000);
+ if (task.isMapTask()) {
+ debugCommand = localJobConf.getMapDebugScript();
+ } else {
[... 1760 lines stripped ...]