You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [26/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Jun 21 06:37:27 2013
@@ -18,13 +18,9 @@
package org.apache.hadoop.mapred;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.InputStreamReader;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.net.BindException;
@@ -53,35 +49,45 @@ import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.AuditLogger.Constants;
-import org.apache.hadoop.mapred.Counters.CountersExceededException;
import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -89,6 +95,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
@@ -96,25 +103,16 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.security.Credentials;
import org.mortbay.util.ajax.JSON;
/*******************************************************
@@ -202,13 +200,17 @@ public class JobTracker implements MRCon
private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+ final static String JT_INIT_CONFIG_KEY_FOR_TESTS =
+ "mapreduce.jobtracker.init.for.tests";
+
public static enum State { INITIALIZING, RUNNING }
- State state = State.INITIALIZING;
- private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+ volatile State state = State.INITIALIZING;
+ private static final int FS_ACCESS_RETRY_PERIOD = 1000;
static final String JOB_INFO_FILE = "job-info";
private DNSToSwitchMapping dnsToSwitchMapping;
- private NetworkTopology clusterMap = new NetworkTopology();
+ private NetworkTopology clusterMap;
private int numTaskCacheLevels; // the max level to which we cache tasks
+ private boolean isNodeGroupAware;
/**
* {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
* so that it can be safely written to and iterated on via 2 separate threads.
@@ -222,6 +224,8 @@ public class JobTracker implements MRCon
private final List<JobInProgressListener> jobInProgressListeners =
new CopyOnWriteArrayList<JobInProgressListener>();
+ private List<ServicePlugin> plugins;
+
private static final LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
//system directory is completely owned by the JobTracker
@@ -276,6 +280,16 @@ public class JobTracker implements MRCon
return clock;
}
+ static final String JT_HDFS_MONITOR_ENABLE =
+ "mapreduce.jt.hdfs.monitor.enable";
+ static final boolean DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE = false;
+
+ static final String JT_HDFS_MONITOR_THREAD_INTERVAL =
+ "mapreduce.jt.hdfs.monitor.interval.ms";
+ static final int DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS = 5000;
+
+ private Thread hdfsMonitor;
+
/**
* Start the JobTracker with given configuration.
*
@@ -291,9 +305,14 @@ public class JobTracker implements MRCon
InterruptedException {
return startTracker(conf, generateNewIdentifier());
}
-
+
public static JobTracker startTracker(JobConf conf, String identifier)
throws IOException, InterruptedException {
+ return startTracker(conf, identifier, false);
+ }
+
+ public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
+ throws IOException, InterruptedException {
DefaultMetricsSystem.initialize("JobTracker");
JobTracker result = null;
while (true) {
@@ -320,6 +339,12 @@ public class JobTracker implements MRCon
if (result != null) {
JobEndNotifier.startNotifier();
MBeans.register("JobTracker", "JobTrackerInfo", result);
+ if(initialize == true) {
+ result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
+ result.initializeFilesystem();
+ result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
+ result.initialize();
+ }
}
return result;
}
@@ -627,15 +652,25 @@ public class JobTracker implements MRCon
Map.Entry<String, ArrayList<JobInProgress>> entry =
userToJobsMapIt.next();
ArrayList<JobInProgress> userJobs = entry.getValue();
+
+ // Remove retiredJobs from userToJobsMap to ensure we don't
+ // retire them multiple times
Iterator<JobInProgress> it = userJobs.iterator();
- while (it.hasNext() &&
- userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+ while (it.hasNext()) {
JobInProgress jobUser = it.next();
if (retiredJobs.contains(jobUser)) {
LOG.info("Removing from userToJobsMap: " +
jobUser.getJobID());
it.remove();
- } else if (minConditionToRetire(jobUser, now)) {
+ }
+ }
+
+ // Now, check for #jobs per user
+ it = userJobs.iterator();
+ while (it.hasNext() &&
+ userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+ JobInProgress jobUser = it.next();
+ if (minConditionToRetire(jobUser, now)) {
LOG.info("User limit exceeded. Marking job: " +
jobUser.getJobID() + " for retire.");
retiredJobs.add(jobUser);
@@ -1216,179 +1251,6 @@ public class JobTracker implements MRCon
/** A custom listener that replays the events in the order in which the
* events (task attempts) occurred.
*/
- class JobRecoveryListener implements Listener {
- // The owner job
- private JobInProgress jip;
-
- private JobHistory.JobInfo job; // current job's info object
-
- // Maintain the count of the (attempt) events recovered
- private int numEventsRecovered = 0;
-
- // Maintains open transactions
- private Map<String, String> hangingAttempts =
- new HashMap<String, String>();
-
- // Whether there are any updates for this job
- private boolean hasUpdates = false;
-
- public JobRecoveryListener(JobInProgress jip) {
- this.jip = jip;
- this.job = new JobHistory.JobInfo(jip.getJobID().toString());
- }
-
- /**
- * Process a task. Note that a task might commit a previously pending
- * transaction.
- */
- private void processTask(String taskId, JobHistory.Task task) {
- // Any TASK info commits the previous transaction
- boolean hasHanging = hangingAttempts.remove(taskId) != null;
- if (hasHanging) {
- numEventsRecovered += 2;
- }
-
- TaskID id = TaskID.forName(taskId);
- TaskInProgress tip = getTip(id);
-
- updateTip(tip, task);
- }
-
- /**
- * Adds a task-attempt in the listener
- */
- private void processTaskAttempt(String taskAttemptId,
- JobHistory.TaskAttempt attempt)
- throws UnknownHostException {
- TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
-
- // Check if the transaction for this attempt can be committed
- String taskStatus = attempt.get(Keys.TASK_STATUS);
- TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId);
- JobInProgress jip = getJob(taskID.getJobID());
- JobStatus prevStatus = (JobStatus)jip.getStatus().clone();
-
- if (taskStatus.length() > 0) {
- // This means this is an update event
- if (taskStatus.equals(Values.SUCCESS.name())) {
- // Mark this attempt as hanging
- hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
- addSuccessfulAttempt(jip, id, attempt);
- } else {
- addUnsuccessfulAttempt(jip, id, attempt);
- numEventsRecovered += 2;
- }
- } else {
- createTaskAttempt(jip, id, attempt);
- }
-
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- if (prevStatus.getRunState() != newStatus.getRunState()) {
- if(LOG.isDebugEnabled())
- LOG.debug("Status changed hence informing prevStatus" + prevStatus + " currentStatus "+ newStatus);
- JobStatusChangeEvent event =
- new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
- prevStatus, newStatus);
- updateJobInProgressListeners(event);
- }
- }
-
- public void handle(JobHistory.RecordTypes recType, Map<Keys,
- String> values) throws IOException {
- if (recType == JobHistory.RecordTypes.Job) {
- // Update the meta-level job information
- job.handle(values);
-
- // Forcefully init the job as we have some updates for it
- checkAndInit();
- } else if (recType.equals(JobHistory.RecordTypes.Task)) {
- String taskId = values.get(Keys.TASKID);
-
- // Create a task
- JobHistory.Task task = new JobHistory.Task();
- task.handle(values);
-
- // Ignore if its a cleanup task
- if (isCleanup(task)) {
- return;
- }
-
- // Process the task i.e update the tip state
- processTask(taskId, task);
- } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-
- // Create a task attempt
- JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
- attempt.handle(values);
-
- // Ignore if its a cleanup task
- if (isCleanup(attempt)) {
- return;
- }
-
- // Process the attempt i.e update the attempt state via job
- processTaskAttempt(attemptId, attempt);
- } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-
- // Create a task attempt
- JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
- attempt.handle(values);
-
- // Ignore if its a cleanup task
- if (isCleanup(attempt)) {
- return;
- }
-
- // Process the attempt i.e update the job state via job
- processTaskAttempt(attemptId, attempt);
- }
- }
-
- // Check if the task is of type CLEANUP
- private boolean isCleanup(JobHistory.Task task) {
- String taskType = task.get(Keys.TASK_TYPE);
- return Values.CLEANUP.name().equals(taskType);
- }
-
- // Init the job if its ready for init. Also make sure that the scheduler
- // is updated
- private void checkAndInit() throws IOException {
- String jobStatus = this.job.get(Keys.JOB_STATUS);
- if (Values.PREP.name().equals(jobStatus)) {
- hasUpdates = true;
- LOG.info("Calling init from RM for job " + jip.getJobID().toString());
- try {
- initJob(jip);
- } catch (Throwable t) {
- LOG.error("Job initialization failed : \n"
- + StringUtils.stringifyException(t));
- jip.status.setFailureInfo("Job Initialization failed: \n"
- + StringUtils.stringifyException(t));
- failJob(jip);
- throw new IOException(t);
- }
- }
- }
-
- void close() {
- if (hasUpdates) {
- // Apply the final (job-level) updates
- JobStatusChangeEvent event = updateJob(jip, job);
-
- synchronized (JobTracker.this) {
- // Update the job listeners
- updateJobInProgressListeners(event);
- }
- }
- }
-
- public int getNumEventsRecovered() {
- return numEventsRecovered;
- }
-
- }
public RecoveryManager() {
jobsToRecover = new TreeSet<JobID>();
@@ -1442,243 +1304,25 @@ public class JobTracker implements MRCon
// checks if the job dir has the required files
public void checkAndAddJob(FileStatus status) throws IOException {
String fileName = status.getPath().getName();
- if (isJobNameValid(fileName)) {
- if (JobClient.isJobDirValid(status.getPath(), fs)) {
- recoveryManager.addJobForRecovery(JobID.forName(fileName));
- shouldRecover = true; // enable actual recovery if num-files > 1
- } else {
- LOG.info("Found an incomplete job directory " + fileName + "."
- + " Deleting it!!");
- fs.delete(status.getPath(), true);
- }
- }
- }
-
- private JobStatusChangeEvent updateJob(JobInProgress jip,
- JobHistory.JobInfo job) {
- // Change the job priority
- String jobpriority = job.get(Keys.JOB_PRIORITY);
- JobPriority priority = JobPriority.valueOf(jobpriority);
- // It's important to update this via the jobtracker's api as it will
- // take care of updating the event listeners too
-
- try {
- setJobPriority(jip.getJobID(), priority);
- } catch (IOException e) {
- // This will not happen. JobTracker can set jobPriority of any job
- // as mrOwner has the needed permissions.
- LOG.warn("Unexpected. JobTracker could not do SetJobPriority on "
- + jip.getJobID() + ". " + e);
- }
-
- // Save the previous job status
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-
- // Set the start/launch time only if there are recovered tasks
- // Increment the job's restart count
- jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
- job.getLong(JobHistory.Keys.LAUNCH_TIME));
-
- // Save the new job status
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-
- return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus,
- newStatus);
- }
-
- private void updateTip(TaskInProgress tip, JobHistory.Task task) {
- long startTime = task.getLong(Keys.START_TIME);
- if (startTime != 0) {
- tip.setExecStartTime(startTime);
- }
-
- long finishTime = task.getLong(Keys.FINISH_TIME);
- // For failed tasks finish-time will be missing
- if (finishTime != 0) {
- tip.setExecFinishTime(finishTime);
- }
-
- String cause = task.get(Keys.TASK_ATTEMPT_ID);
- if (cause.length() > 0) {
- // This means that the this is a FAILED events
- TaskAttemptID id = TaskAttemptID.forName(cause);
- TaskStatus status = tip.getTaskStatus(id);
- synchronized (JobTracker.this) {
- // This will add the tip failed event in the new log
- tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
- status.getPhase(), status.getRunState(),
- status.getTaskTracker());
- }
- }
- }
-
- private void createTaskAttempt(JobInProgress job,
- TaskAttemptID attemptId,
- JobHistory.TaskAttempt attempt)
- throws UnknownHostException {
- TaskID id = attemptId.getTaskID();
- String type = attempt.get(Keys.TASK_TYPE);
- TaskInProgress tip = job.getTaskInProgress(id);
-
- // I. Get the required info
- TaskStatus taskStatus = null;
- String trackerName = attempt.get(Keys.TRACKER_NAME);
- String trackerHostName =
- JobInProgress.convertTrackerNameToHostName(trackerName);
- // recover the port information.
- int port = 0; // default to 0
- String hport = attempt.get(Keys.HTTP_PORT);
- if (hport != null && hport.length() > 0) {
- port = attempt.getInt(Keys.HTTP_PORT);
- }
-
- long attemptStartTime = attempt.getLong(Keys.START_TIME);
-
- // II. Create the (appropriate) task status
- if (type.equals(Values.MAP.name())) {
- taskStatus =
- new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
- TaskStatus.State.RUNNING, "", "", trackerName,
- TaskStatus.Phase.MAP, new Counters());
- } else {
- taskStatus =
- new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE),
- TaskStatus.State.RUNNING, "", "", trackerName,
- TaskStatus.Phase.REDUCE, new Counters());
- }
-
- // Set the start time
- taskStatus.setStartTime(attemptStartTime);
-
- List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
- ttStatusList.add(taskStatus);
-
- // III. Create the dummy tasktracker status
- TaskTrackerStatus ttStatus =
- new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList,
- 0 , 0, 0, 0);
- ttStatus.setLastSeen(clock.getTime());
-
- synchronized (JobTracker.this) {
- synchronized (taskTrackers) {
- synchronized (trackerExpiryQueue) {
- // IV. Register a new tracker
- TaskTracker taskTracker = getTaskTracker(trackerName);
- boolean isTrackerRegistered = (taskTracker != null);
- if (!isTrackerRegistered) {
- markTracker(trackerName); // add the tracker to recovery-manager
- taskTracker = new TaskTracker(trackerName);
- taskTracker.setStatus(ttStatus);
- addNewTracker(taskTracker);
- }
-
- // V. Update the tracker status
- // This will update the meta info of the jobtracker and also add the
- // tracker status if missing i.e register it
- updateTaskTrackerStatus(trackerName, ttStatus);
- }
- }
- // Register the attempt with job and tip, under JobTracker lock.
- // Since, as of today they are atomic through heartbeat.
- // VI. Register the attempt
- // a) In the job
- job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
- // b) In the tip
- tip.updateStatus(taskStatus);
+ if (isJobNameValid(fileName) && isJobDirValid(JobID.forName(fileName))) {
+ recoveryManager.addJobForRecovery(JobID.forName(fileName));
+ shouldRecover = true; // enable actual recovery if num-files > 1
}
-
- // VII. Make an entry in the launched tasks
- expireLaunchingTasks.addNewTask(attemptId);
- }
-
- private void addSuccessfulAttempt(JobInProgress job,
- TaskAttemptID attemptId,
- JobHistory.TaskAttempt attempt) {
- // I. Get the required info
- TaskID taskId = attemptId.getTaskID();
- String type = attempt.get(Keys.TASK_TYPE);
-
- TaskInProgress tip = job.getTaskInProgress(taskId);
- long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
- // Get the task status and the tracker name and make a copy of it
- TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
- taskStatus.setFinishTime(attemptFinishTime);
-
- String stateString = attempt.get(Keys.STATE_STRING);
-
- // Update the basic values
- taskStatus.setStateString(stateString);
- taskStatus.setProgress(1.0f);
- taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-
- // Set the shuffle/sort finished times
- if (type.equals(Values.REDUCE.name())) {
- long shuffleTime =
- Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
- long sortTime =
- Long.parseLong(attempt.get(Keys.SORT_FINISHED));
- taskStatus.setShuffleFinishTime(shuffleTime);
- taskStatus.setSortFinishTime(sortTime);
- }
-
- // Add the counters
- String counterString = attempt.get(Keys.COUNTERS);
- Counters counter = null;
- //TODO Check if an exception should be thrown
- try {
- counter = Counters.fromEscapedCompactString(counterString);
- } catch (ParseException pe) {
- counter = new Counters(); // Set it to empty counter
- }
- taskStatus.setCounters(counter);
-
- synchronized (JobTracker.this) {
- // II. Replay the status
- job.updateTaskStatus(tip, taskStatus);
- }
-
- // III. Prevent the task from expiry
- expireLaunchingTasks.removeTask(attemptId);
}
- private void addUnsuccessfulAttempt(JobInProgress job,
- TaskAttemptID attemptId,
- JobHistory.TaskAttempt attempt) {
- // I. Get the required info
- TaskID taskId = attemptId.getTaskID();
- TaskInProgress tip = job.getTaskInProgress(taskId);
- long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
- TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
- taskStatus.setFinishTime(attemptFinishTime);
-
- // Reset the progress
- taskStatus.setProgress(0.0f);
-
- String stateString = attempt.get(Keys.STATE_STRING);
- taskStatus.setStateString(stateString);
-
- boolean hasFailed =
- attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
- // Set the state failed/killed
- if (hasFailed) {
- taskStatus.setRunState(TaskStatus.State.FAILED);
+ private boolean isJobDirValid(JobID jobId) throws IOException {
+ boolean ret = false;
+ Path jobInfoFile = getSystemFileForJob(jobId);
+ final Path jobTokenFile = getTokenFileForJob(jobId);
+ JobConf job = new JobConf();
+ if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)
+ && jobInfoFile.getFileSystem(job).exists(jobInfoFile)) {
+ ret = true;
} else {
- taskStatus.setRunState(TaskStatus.State.KILLED);
+ LOG.warn("Job " + jobId
+ + " does not have valid info/token file so ignoring for recovery");
}
-
- // Get/Set the error msg
- String diagInfo = attempt.get(Keys.ERROR);
- taskStatus.setDiagnosticInfo(diagInfo); // diag info
-
- synchronized (JobTracker.this) {
- // II. Update the task status
- job.updateTaskStatus(tip, taskStatus);
- }
-
- // III. Prevent the task from expiry
- expireLaunchingTasks.removeTask(attemptId);
+ return ret;
}
Path getRestartCountFile() {
@@ -1714,11 +1358,9 @@ public class JobTracker implements MRCon
fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
} else {
// For the very first time the jobtracker will create a jobtracker.info
- // file. If the jobtracker has restarted then disable recovery as files'
- // needed for recovery are missing.
-
- // disable recovery if this is a restart
- shouldRecover = false;
+ // file.
+ // enable recovery if this is a restart
+ shouldRecover = true;
// write the jobtracker.info file
try {
@@ -1770,205 +1412,59 @@ public class JobTracker implements MRCon
fs.rename(tmpRestartFile, restartFile);
}
- // mapred.JobID::forName returns
- @SuppressWarnings("unchecked") // mapreduce.JobID
public void recover() {
+ int recovered = 0;
+ long recoveryProcessStartTime = clock.getTime();
if (!shouldRecover()) {
// clean up jobs structure
jobsToRecover.clear();
return;
}
- LOG.info("Restart count of the jobtracker : " + restartCount);
-
- // I. Init the jobs and cache the recovered job history filenames
- Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
- Iterator<JobID> idIter = jobsToRecover.iterator();
- JobInProgress job = null;
- File jobIdFile = null;
-
- // 0. Cleanup
- try {
- JobHistory.JobInfo.deleteConfFiles();
- } catch (IOException ioe) {
- LOG.info("Error in cleaning up job history folder", ioe);
- }
-
- while (idIter.hasNext()) {
- JobID id = idIter.next();
- LOG.info("Trying to recover details of job " + id);
+ LOG.info("Starting the recovery process for " + jobsToRecover.size()
+ + " jobs ...");
+ for (JobID jobId : jobsToRecover) {
+ LOG.info("Submitting job " + jobId);
try {
- // 1. Recover job owner and create JIP
- jobIdFile =
- new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());
-
- String user = null;
- if (jobIdFile != null && jobIdFile.exists()) {
- LOG.info("File " + jobIdFile + " exists for job " + id);
- FileInputStream in = new FileInputStream(jobIdFile);
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(in));
- user = reader.readLine();
- LOG.info("Recovered user " + user + " for job " + id);
- } finally {
- if (reader != null) {
- reader.close();
- }
- in.close();
- }
- }
- if (user == null) {
- throw new RuntimeException("Incomplete job " + id);
- }
-
- // Create the job
- /* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE
- * BACKPORTED (MAPREDUCE-873)
- */
- job = new JobInProgress(JobTracker.this, conf,
- new JobInfo((org.apache.hadoop.mapreduce.JobID) id,
- new Text(user), new Path(getStagingAreaDirInternal(user))),
- restartCount, new Credentials() /*HACK*/);
-
- // 2. Check if the user has appropriate access
- // Get the user group info for the job's owner
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(job.getJobConf().getUser());
- LOG.info("Submitting job " + id + " on behalf of user "
- + ugi.getShortUserName() + " in groups : "
- + StringUtils.arrayToString(ugi.getGroupNames()));
-
- // check the access
- try {
- aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
- } catch (Throwable t) {
- LOG.warn("Access denied for user " + ugi.getShortUserName()
- + " in groups : ["
- + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
- throw t;
- }
-
- // 3. Get the log file and the file path
- String logFileName =
- JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
- if (logFileName != null) {
- Path jobHistoryFilePath =
- JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
- // 4. Recover the history file. This involved
- // - deleting file.recover if file exists
- // - renaming file.recover to file if file doesnt exist
- // This makes sure that the (master) file exists
- JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
- jobHistoryFilePath);
+ Path jobInfoFile = getSystemFileForJob(jobId);
+ final Path jobTokenFile = getTokenFileForJob(jobId);
+ FSDataInputStream in = fs.open(jobInfoFile);
+ final JobInfo token = new JobInfo();
+ token.readFields(in);
+ in.close();
- // 5. Cache the history file name as it costs one dfs access
- jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+ // Read tokens as JT user
+ JobConf job = new JobConf();
+ final Credentials ts =
+ (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) ?
+ Credentials.readTokenStorageFile(jobTokenFile, job) : null;
+
+ // Re-submit job
+ final UserGroupInformation ugi = UserGroupInformation
+ .createRemoteUser(token.getUser().toString());
+ JobStatus status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+ public JobStatus run() throws IOException, InterruptedException {
+ return submitJob(JobID.downgrade(token.getJobID()), token
+ .getJobSubmitDir().toString(), ugi, ts, true);
+ }
+ });
+ if (status == null) {
+ LOG.info("Job " + jobId + " was not recovered since it " +
+ "disabled recovery on restart (" + JobConf.MAPREDUCE_RECOVER_JOB +
+ " set to 'false').");
} else {
- LOG.info("No history file found for job " + id);
- idIter.remove(); // remove from recovery list
- }
-
- // 6. Sumbit the job to the jobtracker
- addJob(id, job);
- } catch (Throwable t) {
- LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
- idIter.remove();
- if (jobIdFile != null) {
- jobIdFile.delete();
- jobIdFile = null;
- }
- if (job != null) {
- job.fail();
- job = null;
- }
- continue;
- }
- }
-
- long recoveryStartTime = clock.getTime();
-
- // II. Recover each job
- idIter = jobsToRecover.iterator();
- while (idIter.hasNext()) {
- JobID id = idIter.next();
- JobInProgress pJob = getJob(id);
-
- // 1. Get the required info
- // Get the recovered history file
- Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
- String logFileName = jobHistoryFilePath.getName();
-
- FileSystem fs;
- try {
- fs = jobHistoryFilePath.getFileSystem(conf);
- } catch (IOException ioe) {
- LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
- ioe);
- continue;
- }
-
- // 2. Parse the history file
- // Note that this also involves job update
- JobRecoveryListener listener = new JobRecoveryListener(pJob);
- try {
- JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
- listener, fs);
- } catch (Throwable t) {
- LOG.info("Error reading history file of job " + pJob.getJobID()
- + ". Ignoring the error and continuing.", t);
- }
-
- // 3. Close the listener
- listener.close();
-
- // 4. Update the recovery metric
- totalEventsRecovered += listener.getNumEventsRecovered();
-
- // 5. Cleanup history
- // Delete the master log file as an indication that the new file
- // should be used in future
- try {
- synchronized (pJob) {
- JobHistory.JobInfo.checkpointRecovery(logFileName,
- pJob.getJobConf());
+ recovered++;
}
- } catch (Throwable t) {
- LOG.warn("Failed to delete log file (" + logFileName + ") for job "
- + id + ". Continuing.", t);
- }
-
- if (pJob.isComplete()) {
- idIter.remove(); // no need to keep this job info as its successful
+ } catch (Exception e) {
+ LOG.warn("Could not recover job " + jobId, e);
}
}
-
- recoveryDuration = clock.getTime() - recoveryStartTime;
+ recoveryDuration = clock.getTime() - recoveryProcessStartTime;
hasRecovered = true;
- // III. Finalize the recovery
- synchronized (trackerExpiryQueue) {
- // Make sure that the tracker statuses in the expiry-tracker queue
- // are updated
- long now = clock.getTime();
- int size = trackerExpiryQueue.size();
- for (int i = 0; i < size ; ++i) {
- // Get the first tasktracker
- TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
-
- // Remove it
- trackerExpiryQueue.remove(taskTracker);
-
- // Set the new time
- taskTracker.setLastSeen(now);
-
- // Add back to get the sorted list
- trackerExpiryQueue.add(taskTracker);
- }
- }
-
- LOG.info("Restoration complete");
+ LOG.info("Recovery done! Recoverd " + recovered + " of "
+ + jobsToRecover.size() + " jobs.");
+ LOG.info("Recovery Duration (ms):" + recoveryDuration);
}
int totalEventsRecovered() {
@@ -2193,8 +1689,183 @@ public class JobTracker implements MRCon
this(conf, identifier, clock, new QueueManager(new Configuration(conf)));
}
+ private void initJTConf(JobConf conf) {
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false)) {
+ LOG.warn(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY +
+ " is enabled, disabling it");
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false);
+ }
+ }
+
+ @InterfaceAudience.Private
+ void initializeFilesystem() throws IOException, InterruptedException {
+ // Connect to HDFS NameNode
+ while (!Thread.currentThread().isInterrupted() && fs == null) {
+ try {
+ fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ Path systemDir = new Path(conf.get("mapred.system.dir",
+ "/tmp/hadoop/mapred/system"));
+ return FileSystem.get(systemDir.toUri(), conf);
+ }});
+ } catch (IOException ie) {
+ fs = null;
+ LOG.info("Problem connecting to HDFS Namenode... re-trying", ie);
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+ }
+ }
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ // Ensure HDFS is healthy
+ if ("hdfs".equalsIgnoreCase(fs.getUri().getScheme())) {
+ while (!DistributedFileSystem.isHealthy(fs.getUri())) {
+ LOG.info("HDFS initialized but not 'healthy' yet, waiting...");
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+ }
+ }
+ }
+
+ @InterfaceAudience.Private
+ void initialize()
+ throws IOException, InterruptedException {
+ // initialize history parameters.
+ final JobTracker jtFinal = this;
+
+ getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws Exception {
+ JobHistory.init(jtFinal, conf, jtFinal.localMachine,
+ jtFinal.startTime);
+ return true;
+ }
+ });
+
+ // start the recovery manager
+ recoveryManager = new RecoveryManager();
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ // if we haven't contacted the namenode go ahead and do it
+ UserGroupInformation mrOwner = getMROwner();
+ // clean up the system dir, which will only work if hdfs is out of
+ // safe mode
+ if(systemDir == null) {
+ systemDir = new Path(getSystemDir());
+ }
+ try {
+ FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+ if (!systemDirStatus.isOwnedByUser(
+ mrOwner.getShortUserName(), mrOwner.getGroupNames())) {
+ throw new AccessControlException("The systemdir " + systemDir +
+ " is not owned by " + mrOwner.getShortUserName());
+ }
+ if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+ LOG.warn("Incorrect permissions on " + systemDir +
+ ". Setting it to " + SYSTEM_DIR_PERMISSION);
+ fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
+ }
+ } catch (FileNotFoundException fnf) {} //ignore
+ // Make sure that the backup data is preserved
+ FileStatus[] systemDirData = fs.listStatus(this.systemDir);
+ // Check if the history is enabled .. as we cant have persistence with
+ // history disabled
+ if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
+ && systemDirData != null) {
+ for (FileStatus status : systemDirData) {
+ try {
+ recoveryManager.checkAndAddJob(status);
+ } catch (Throwable t) {
+ LOG.warn("Failed to add the job " + status.getPath().getName(),
+ t);
+ }
+ }
+
+ // Check if there are jobs to be recovered
+ hasRestarted = recoveryManager.shouldRecover();
+ if (hasRestarted) {
+ break; // if there is something to recover else clean the sys dir
+ }
+ }
+ LOG.info("Cleaning up the system directory");
+ fs.delete(systemDir, true);
+ if (FileSystem.mkdirs(fs, systemDir,
+ new FsPermission(SYSTEM_DIR_PERMISSION))) {
+ break;
+ }
+ LOG.error("Mkdirs failed to create " + systemDir);
+ } catch (AccessControlException ace) {
+ LOG.warn("Failed to operate on mapred.system.dir (" + systemDir
+ + ") because of permissions.");
+ LOG.warn("Manually delete the mapred.system.dir (" + systemDir
+ + ") and then start the JobTracker.");
+ LOG.warn("Bailing out ... ", ace);
+ throw ace;
+ } catch (IOException ie) {
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
+ }
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+ }
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ // Same with 'localDir' except it's always on the local disk.
+ if (!hasRestarted) {
+ conf.deleteLocalFiles(SUBDIR);
+ }
+
+ // Initialize history DONE folder
+ FileSystem historyFS = getMROwner().doAs(
+ new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ JobHistory.initDone(conf, fs);
+ final String historyLogDir =
+ JobHistory.getCompletedJobHistoryLocation().toString();
+ infoServer.setAttribute("historyLogDir", historyLogDir);
+
+ infoServer.setAttribute
+ ("serialNumberDirectoryDigits",
+ Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
+
+ infoServer.setAttribute
+ ("serialNumberTotalDigits",
+ Integer.valueOf(JobHistory.serialNumberTotalDigits()));
+
+ return new Path(historyLogDir).getFileSystem(conf);
+ }
+ });
+ infoServer.setAttribute("fileSys", historyFS);
+ infoServer.setAttribute("jobConf", conf);
+ infoServer.setAttribute("aclManager", aclsManager);
+
+ if (JobHistoryServer.isEmbedded(conf)) {
+ LOG.info("History server being initialized in embedded mode");
+ jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
+ jobHistoryServer.start();
+ LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
+ }
+
+ //initializes the job status store
+ completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
+
+ // Setup HDFS monitoring
+ if (this.conf.getBoolean(
+ JT_HDFS_MONITOR_ENABLE, DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE)) {
+ hdfsMonitor = new HDFSMonitorThread(this.conf, this, this.fs);
+ hdfsMonitor.start();
+ }
+ }
+
JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
+
+ initJTConf(conf);
+
this.queueManager = qm;
this.clock = clock;
// Set ports, start RPC servers, setup security policy etc.
@@ -2291,6 +1962,11 @@ public class JobTracker implements MRCon
LOG.info("Starting jobtracker with owner as " +
getMROwner().getShortUserName());
+ // Create network topology
+ clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
+ conf.getClass("net.topology.impl", NetworkTopology.class,
+ NetworkTopology.class), conf);
+
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass
= conf.getClass("mapred.jobtracker.taskScheduler",
@@ -2300,7 +1976,12 @@ public class JobTracker implements MRCon
// Set service-level authorization security policy
if (conf.getBoolean(
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
- ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
+ PolicyProvider policyProvider =
+ (PolicyProvider)(ReflectionUtils.newInstance(
+ conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
+ MapReducePolicyProvider.class, PolicyProvider.class),
+ conf));
+ ServiceAuthorizationManager.refresh(conf, policyProvider);
}
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
@@ -2327,16 +2008,6 @@ public class JobTracker implements MRCon
infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
tmpInfoPort == 0, conf, aclsManager.getAdminsAcl());
infoServer.setAttribute("job.tracker", this);
- // initialize history parameters.
- final JobTracker jtFinal = this;
- getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws Exception {
- JobHistory.init(jtFinal, conf,jtFinal.localMachine,
- jtFinal.startTime);
- return true;
- }
- });
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
infoServer.start();
@@ -2356,128 +2027,27 @@ public class JobTracker implements MRCon
infoBindAddress + ":" + this.infoPort);
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
- // start the recovery manager
- recoveryManager = new RecoveryManager();
-
- while (!Thread.currentThread().isInterrupted()) {
- try {
- // if we haven't contacted the namenode go ahead and do it
- UserGroupInformation mrOwner = getMROwner();
- if (fs == null) {
- fs = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws IOException {
- Path systemDir = new Path(conf.get("mapred.system.dir",
- "/tmp/hadoop/mapred/system"));
- return FileSystem.get(systemDir.toUri(), conf);
- }});
- }
- // clean up the system dir, which will only work if hdfs is out of
- // safe mode
- if(systemDir == null) {
- systemDir = new Path(getSystemDir());
- }
- try {
- FileStatus systemDirStatus = fs.getFileStatus(systemDir);
- if (!systemDirStatus.isOwnedByUser(
- mrOwner.getShortUserName(), mrOwner.getGroupNames())) {
- throw new AccessControlException("The systemdir " + systemDir +
- " is not owned by " + mrOwner.getShortUserName());
- }
- if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
- LOG.warn("Incorrect permissions on " + systemDir +
- ". Setting it to " + SYSTEM_DIR_PERMISSION);
- fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
- }
- } catch (FileNotFoundException fnf) {} //ignore
- // Make sure that the backup data is preserved
- FileStatus[] systemDirData = fs.listStatus(this.systemDir);
- // Check if the history is enabled .. as we cant have persistence with
- // history disabled
- if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
- && systemDirData != null) {
- for (FileStatus status : systemDirData) {
- try {
- recoveryManager.checkAndAddJob(status);
- } catch (Throwable t) {
- LOG.warn("Failed to add the job " + status.getPath().getName(),
- t);
- }
- }
-
- // Check if there are jobs to be recovered
- hasRestarted = recoveryManager.shouldRecover();
- if (hasRestarted) {
- break; // if there is something to recover else clean the sys dir
- }
- }
- LOG.info("Cleaning up the system directory");
- fs.delete(systemDir, true);
- if (FileSystem.mkdirs(fs, systemDir,
- new FsPermission(SYSTEM_DIR_PERMISSION))) {
- break;
- }
- LOG.error("Mkdirs failed to create " + systemDir);
- } catch (AccessControlException ace) {
- LOG.warn("Failed to operate on mapred.system.dir (" + systemDir
- + ") because of permissions.");
- LOG.warn("Manually delete the mapred.system.dir (" + systemDir
- + ") and then start the JobTracker.");
- LOG.warn("Bailing out ... ", ace);
- throw ace;
- } catch (IOException ie) {
- LOG.info("problem cleaning system directory: " + systemDir, ie);
- }
- Thread.sleep(FS_ACCESS_RETRY_PERIOD);
- }
-
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
-
- // Same with 'localDir' except it's always on the local disk.
- if (!hasRestarted) {
- jobConf.deleteLocalFiles(SUBDIR);
- }
-
- // Initialize history DONE folder
- FileSystem historyFS = getMROwner().doAs(
- new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws IOException {
- JobHistory.initDone(conf, fs);
- final String historyLogDir =
- JobHistory.getCompletedJobHistoryLocation().toString();
- infoServer.setAttribute("historyLogDir", historyLogDir);
-
- infoServer.setAttribute
- ("serialNumberDirectoryDigits",
- Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
-
- infoServer.setAttribute
- ("serialNumberTotalDigits",
- Integer.valueOf(JobHistory.serialNumberTotalDigits()));
-
- return new Path(historyLogDir).getFileSystem(conf);
- }
- });
- infoServer.setAttribute("fileSys", historyFS);
- infoServer.setAttribute("jobConf", conf);
- infoServer.setAttribute("aclManager", aclsManager);
-
- if (JobHistoryServer.isEmbedded(conf)) {
- LOG.info("History server being initialized in embedded mode");
- jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
- jobHistoryServer.start();
- LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
- }
-
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
NetworkTopology.DEFAULT_HOST_LEVEL);
-
- //initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
+ this.isNodeGroupAware = conf.getBoolean(
+ "mapred.jobtracker.nodegroup.aware", false);
+
+ plugins = conf.getInstances("mapreduce.jobtracker.plugins",
+ ServicePlugin.class);
+ for (ServicePlugin p : plugins) {
+ try {
+ p.start(this);
+ LOG.info("Started plug-in " + p + " of type " + p.getClass());
+ } catch (Throwable t) {
+ LOG.warn("ServicePlugin " + p + " of type " + p.getClass()
+ + " could not be started", t);
+ }
+ }
+
+ this.initDone.set(conf.getBoolean(JT_INIT_CONFIG_KEY_FOR_TESTS, true));
}
private static SimpleDateFormat getDateFormat() {
@@ -2568,6 +2138,17 @@ public class JobTracker implements MRCon
* Run forever
*/
public void offerService() throws InterruptedException, IOException {
+ // start the inter-tracker server
+ this.interTrackerServer.start();
+
+ // Initialize the JobTracker FileSystem within safemode
+ setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
+ initializeFilesystem();
+ setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
+
+ // Initialize JobTracker
+ initialize();
+
// Prepare for recovery. This is done irrespective of the status of restart
// flag.
while (true) {
@@ -2607,19 +2188,53 @@ public class JobTracker implements MRCon
completedJobsStoreThread.start();
}
- // start the inter-tracker server once the jt is ready
- this.interTrackerServer.start();
-
+ // Just for unit-tests
+ waitForInit();
synchronized (this) {
state = State.RUNNING;
}
+
LOG.info("Starting RUNNING");
this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
+ AtomicBoolean initDone = new AtomicBoolean(true);
+ Object initDoneLock = new Object();
+
+ private void waitForInit() {
+ synchronized (initDoneLock) {
+ while (!initDone.get()) {
+ try {
+ LOG.debug("About to wait since initDone = false");
+ initDoneLock.wait();
+ } catch (InterruptedException ie) {
+ LOG.debug("Ignoring ", ie);
+ }
+ }
+ }
+ }
+
+ void setInitDone(boolean done) {
+ synchronized (initDoneLock) {
+ initDone.set(done);
+ initDoneLock.notify();
+ }
+ }
+
void close() throws IOException {
+ if (plugins != null) {
+ for (ServicePlugin p : plugins) {
+ try {
+ p.stop();
+ LOG.info("Stopped plug-in " + p + " of type " + p.getClass());
+ } catch (Throwable t) {
+ LOG.warn("ServicePlugin " + p + " of type " + p.getClass()
+ + " could not be stopped", t);
+ }
+ }
+ }
if (this.infoServer != null) {
LOG.info("Stopping infoServer");
try {
@@ -3267,7 +2882,9 @@ public class JobTracker implements MRCon
public int getNumberOfUniqueHosts() {
return uniqueHostsMap.size();
}
-
+ public boolean isNodeGroupAware() {
+ return isNodeGroupAware;
+ }
public void addJobInProgressListener(JobInProgressListener listener) {
jobInProgressListeners.add(listener);
}
@@ -3294,8 +2911,13 @@ public class JobTracker implements MRCon
////////////////////////////////////////////////////
// InterTrackerProtocol
////////////////////////////////////////////////////
-
- public String getBuildVersion() throws IOException{
+
+ // Just returns the VersionInfo version (unlike MXBean#getVersion)
+ public String getVIVersion() throws IOException {
+ return VersionInfo.getVersion();
+ }
+
+ public String getBuildVersion() throws IOException {
return VersionInfo.getBuildVersion();
}
@@ -3412,7 +3034,7 @@ public class JobTracker implements MRCon
}
}
}
-
+
// Check for tasks to be killed
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
@@ -3461,7 +3083,7 @@ public class JobTracker implements MRCon
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatInterval = Math.max(
(int)(1000 * HEARTBEATS_SCALING_FACTOR *
- Math.ceil((double)clusterSize /
+ ((double)clusterSize /
NUM_HEARTBEATS_IN_SECOND)),
HEARTBEAT_INTERVAL_MIN) ;
return heartbeatInterval;
@@ -3807,6 +3429,12 @@ public class JobTracker implements MRCon
// returns cleanup tasks first, then setup tasks.
synchronized List<Task> getSetupAndCleanupTasks(
TaskTrackerStatus taskTracker) throws IOException {
+
+ // Don't assign *any* new task in safemode
+ if (isInSafeMode()) {
+ return null;
+ }
+
int maxMapTasks = taskTracker.getMaxMapSlots();
int maxReduceTasks = taskTracker.getMaxReduceSlots();
int numMaps = taskTracker.countOccupiedMapSlots();
@@ -3914,9 +3542,12 @@ public class JobTracker implements MRCon
* Allocates a new JobId string.
*/
public synchronized JobID getNewJobId() throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
return new JobID(getTrackerIdentifier(), nextJobId++);
}
-
+
/**
* JobTracker.submitJob() kicks off a new job.
*
@@ -3927,8 +3558,32 @@ public class JobTracker implements MRCon
*/
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
+ return submitJob(jobId, jobSubmitDir, null, ts, false);
+ }
+
+ /**
+ * JobTracker.submitJob() kicks off a new job.
+ *
+ * Create a 'JobInProgress' object, which contains both JobProfile and
+ * JobStatus. Those two sub-objects are sometimes shipped outside of the
+ * JobTracker. But JobInProgress adds info that's useful for the JobTracker
+ * alone.
+ * @return null if the job is being recovered but mapred.job.restart.recover
+ * is false.
+ */
+ JobStatus submitJob(JobID jobId, String jobSubmitDir,
+ UserGroupInformation ugi, Credentials ts, boolean recovered)
+ throws IOException {
+ // Check for safe-mode
+ checkSafeMode();
+
JobInfo jobInfo = null;
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ if (ugi == null) {
+ ugi = UserGroupInformation.getCurrentUser();
+ }
synchronized (this) {
if (jobs.containsKey(jobId)) {
// job already running, don't start twice
@@ -3938,15 +3593,40 @@ public class JobTracker implements MRCon
new Path(jobSubmitDir));
}
+ // Store the job-info in a file so that the job can be recovered
+ // later (if at all)
+ // Note: jobDir & jobInfo are owned by JT user since we are using
+ // his fs object
+ if (!recovered) {
+ Path jobDir = getSystemDirectoryForJob(jobId);
+ FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+ FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+ jobInfo.write(out);
+ out.close();
+ }
+
// Create the JobInProgress, do not lock the JobTracker since
- // we are about to copy job.xml from HDFS
+ // we are about to copy job.xml from HDFS and write jobToken file to HDFS
JobInProgress job = null;
try {
+ if (ts == null) {
+ ts = new Credentials();
+ }
+ generateAndStoreJobTokens(jobId, ts);
job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
} catch (Exception e) {
throw new IOException(e);
}
+ if (recovered &&
+ !job.getJobConf().getBoolean(
+ JobConf.MAPREDUCE_RECOVER_JOB,
+ JobConf.DEFAULT_MAPREDUCE_RECOVER_JOB)) {
+ LOG.info("Job "+ jobId.toString() + " is not enable for recovery, cleaning up job files");
+ job.cleanupJob();
+ return null;
+ }
+
synchronized (this) {
// check if queue is RUNNING
String queue = job.getProfile().getQueueName();
@@ -3969,19 +3649,6 @@ public class JobTracker implements MRCon
} catch (IOException ioe) {
throw ioe;
}
- boolean recovered = true; // TODO: Once the Job recovery code is there,
- // (MAPREDUCE-873) we
- // must pass the "recovered" flag accurately.
- // This is handled in the trunk/0.22
- if (!recovered) {
- // Store the information in a file so that the job can be recovered
- // later (if at all)
- Path jobDir = getSystemDirectoryForJob(jobId);
- FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
- FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
- jobInfo.write(out);
- out.close();
- }
try {
this.taskScheduler.checkJobSubmission(job);
@@ -4001,7 +3668,6 @@ public class JobTracker implements MRCon
failJob(job);
throw ioe;
}
-
return status;
}
}
@@ -4010,6 +3676,9 @@ public class JobTracker implements MRCon
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
+ // Check for safe-mode
+ checkSafeMode();
+
try{
final String user =
UserGroupInformation.getCurrentUser().getShortUserName();
@@ -4126,6 +3795,12 @@ public class JobTracker implements MRCon
return;
}
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
+ // No 'killJob' in safe-mode
+ checkSafeMode();
+
JobInProgress job = jobs.get(jobid);
if (null == job) {
@@ -4269,6 +3944,9 @@ public class JobTracker implements MRCon
public synchronized void setJobPriority(JobID jobid,
String priority)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
JobInProgress job = jobs.get(jobid);
if (null == job) {
LOG.info("setJobPriority(): JobId " + jobid.toString()
@@ -4296,7 +3974,10 @@ public class JobTracker implements MRCon
return job.inited();
}
- public JobProfile getJobProfile(JobID jobid) {
+ public JobProfile getJobProfile(JobID jobid) throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
synchronized (this) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
@@ -4313,7 +3994,10 @@ public class JobTracker implements MRCon
return completedJobStatusStore.readJobProfile(jobid);
}
- public JobStatus getJobStatus(JobID jobid) {
+ public JobStatus getJobStatus(JobID jobid) throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
if (null == jobid) {
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
return null;
@@ -4336,6 +4020,9 @@ public class JobTracker implements MRCon
private static final Counters EMPTY_COUNTERS = new Counters();
public Counters getJobCounters(JobID jobid) throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
synchronized (this) {
JobInProgress job = jobs.get(jobid);
@@ -4370,6 +4057,9 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getMapTaskReports(JobID jobid)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
JobInProgress job = jobs.get(jobid);
if (job != null) {
// Check authorization
@@ -4398,6 +4088,9 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getReduceTaskReports(JobID jobid)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
JobInProgress job = jobs.get(jobid);
if (job != null) {
// Check authorization
@@ -4424,6 +4117,9 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
JobInProgress job = jobs.get(jobid);
if (job != null) {
// Check authorization
@@ -4453,6 +4149,9 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getSetupTaskReports(JobID jobid)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
JobInProgress job = jobs.get(jobid);
if (job != null) {
// Check authorization
@@ -4496,6 +4195,9 @@ public class JobTracker implements MRCon
*/
public TaskCompletionEvent[] getTaskCompletionEvents(
JobID jobid, int fromEventId, int maxEvents) throws IOException{
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
JobInProgress job = this.jobs.get(jobid);
if (null != job) {
@@ -4517,6 +4219,9 @@ public class JobTracker implements MRCon
*/
public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
List<String> taskDiagnosticInfo = null;
JobID jobId = taskId.getJobID();
TaskID tipId = taskId.getTaskID();
@@ -4581,6 +4286,12 @@ public class JobTracker implements MRCon
*/
public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
throws IOException {
+ // Check for JobTracker operational state
+ checkJobTrackerState();
+
+ // No 'killTask' in safe-mode
+ checkSafeMode();
+
TaskInProgress tip = taskidToTIPMap.get(taskid);
if(tip != null) {
// check both queue-level and job-level access
@@ -4622,6 +4333,11 @@ public class JobTracker implements MRCon
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
*/
public String getSystemDir() {
+ // Might not be initialized yet, TT handles this
+ if (isInSafeMode()) {
+ return null;
+ }
+
Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
return fs.makeQualified(sysDir).toString();
}
@@ -4652,9 +4368,15 @@ public class JobTracker implements MRCon
//Get the job token file in system directory
Path getSystemFileForJob(JobID id) {
- return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
+ return new Path(getSystemDirectoryForJob(id), JOB_INFO_FILE);
}
+ //Get the job token file in system directory
+ Path getTokenFileForJob(JobID id) {
+ return new Path(
+ getSystemDirectoryForJob(id), TokenCache.JOB_TOKEN_HDFS_FILE);
+ }
+
/**
* Change the run-time priority of the given job.
*
@@ -4702,8 +4424,10 @@ public class JobTracker implements MRCon
report.setTaskTracker(trackerName);
TaskAttemptID taskId = report.getTaskID();
- // expire it
- expireLaunchingTasks.removeTask(taskId);
+ // don't expire the task if it is not unassigned
+ if (report.getRunState() != TaskStatus.State.UNASSIGNED) {
+ expireLaunchingTasks.removeTask(taskId);
+ }
JobInProgress job = getJob(taskId.getJobID());
if (job == null) {
@@ -5385,4 +5109,145 @@ public class JobTracker implements MRCon
return map;
}
// End MXbean implementaiton
+
+ /**
+ * JobTracker SafeMode
+ */
+ // SafeMode actions
+ public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+
+ private AtomicBoolean safeMode = new AtomicBoolean(false);
+ private AtomicBoolean adminSafeMode = new AtomicBoolean(false);
+ private String adminSafeModeUser = "";
+
+ public boolean setSafeMode(JobTracker.SafeModeAction safeModeAction)
+ throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ // Anyone can check JT safe-mode
+ if (safeModeAction == SafeModeAction.SAFEMODE_GET) {
+ boolean safeMode = this.safeMode.get();
+ LOG.info("Getting safemode information: safemode=" + safeMode + ". " +
+ "Requested by : " +
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ AuditLogger.logSuccess(user, Constants.GET_SAFEMODE,
+ Constants.JOBTRACKER);
+ return safeMode;
+ }
+
+ // Check access for modifications to safe-mode
+ if (!aclsManager.isMRAdmin(UserGroupInformation.getCurrentUser())) {
+ AuditLogger.logFailure(user, Constants.SET_SAFEMODE,
+ aclsManager.getAdminsAcl().toString(), Constants.JOBTRACKER,
+ Constants.UNAUTHORIZED_USER);
+ throw new AccessControlException(user +
+ " is not authorized to set " +
+ " JobTracker safemode.");
+ }
+ AuditLogger.logSuccess(user, Constants.SET_SAFEMODE, Constants.JOBTRACKER);
+
+ boolean currSafeMode = setSafeModeInternal(safeModeAction);
+ adminSafeMode.set(currSafeMode);
+ adminSafeModeUser = user;
+ return currSafeMode;
+ }
+
+ boolean isInAdminSafeMode() {
+ return adminSafeMode.get();
+ }
+
+ boolean setSafeModeInternal(JobTracker.SafeModeAction safeModeAction)
+ throws IOException {
+ if (safeModeAction != SafeModeAction.SAFEMODE_GET) {
+ boolean safeMode = false;
+ if (safeModeAction == SafeModeAction.SAFEMODE_ENTER) {
+ safeMode = true;
+ } else if (safeModeAction == SafeModeAction.SAFEMODE_LEAVE) {
+ safeMode = false;
+ }
+ LOG.info("Setting safe mode to " + safeMode + ". Requested by : " +
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ this.safeMode.set(safeMode);
+ }
+ return this.safeMode.get();
+ }
+
+ public boolean isInSafeMode() {
+ return safeMode.get();
+ }
+
+ String getSafeModeText() {
+ if (!isInSafeMode())
+ return "OFF";
+ String safeModeInfo =
+ adminSafeMode.get() ?
+ "Set by admin <strong>" + adminSafeModeUser + "</strong>":
+ "HDFS unavailable";
+ return "<em>ON - " + safeModeInfo + "</em>";
+ }
+
+ private void checkSafeMode() throws SafeModeException {
+ if (isInSafeMode()) {
+ SafeModeException sme =
+ new SafeModeException(
+ (isInAdminSafeMode()) ? adminSafeModeUser : null);
+ LOG.info("JobTracker in safe-mode, aborting operation: ", sme);
+ throw sme;
+ }
+ }
+
+ private void checkJobTrackerState()
+ throws JobTrackerNotYetInitializedException {
+ if (state != State.RUNNING) {
+ JobTrackerNotYetInitializedException jtnyie =
+ new JobTrackerNotYetInitializedException();
+ LOG.info("JobTracker not yet in RUNNING state, aborting operation: ",
+ jtnyie);
+ throw jtnyie;
+ }
+ }
+
+ /**
+ * generate job token and save it into the file
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void
+ generateAndStoreJobTokens(final JobID jobId, final Credentials tokenStorage)
+ throws IOException {
+
+ // Write out jobToken as JT user
+ try {
+ getMROwner().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws IOException {
+
+ Path jobDir = getSystemDirectoryForJob(jobId);
+ Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
+ //create JobToken file and write token to it
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+ .toString()));
+ Token<JobTokenIdentifier> token =
+ new Token<JobTokenIdentifier>(
+ identifier, getJobTokenSecretManager());
+ token.setService(identifier.getJobId());
+
+ TokenCache.setJobToken(token, tokenStorage);
+
+ // write TokenStorage out
+ tokenStorage.writeTokenStorageFile(keysFile, getConf());
+ LOG.info("jobToken generated and stored with users keys in "
+ + keysFile.toUri().getPath());
+
+ return null;
+
+ }
+ });
+ } catch (InterruptedException ie) {
+ // TODO Auto-generated catch block
+ throw new IOException(ie);
+ }
+
+ }
+
}
Added: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java (added)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the JobTracker is still initializing and
+ * not yet operational.
+ */
+public class JobTrackerNotYetInitializedException extends IOException {
+
+ private static final long serialVersionUID = 1984839357L;
+
+ public JobTrackerNotYetInitializedException() {
+ super("JobTracker is not yet RUNNING");
+ }
+
+}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Fri Jun 21 06:37:27 2013
@@ -101,7 +101,7 @@ public class KeyValueLineRecordReader im
return true;
}
- public float getProgress() {
+ public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -23,7 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
/**
* An {@link InputFormat} for plain text files. Files are broken into lines.
@@ -41,9 +43,13 @@ public class KeyValueTextInputFormat ext
}
protected boolean isSplitable(FileSystem fs, Path file) {
- return compressionCodecs.getCodec(file) == null;
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+ return codec instanceof SplittableCompressionCodec;
}
-
+
public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
JobConf job,
Reporter reporter)
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java Fri Jun 21 06:37:27 2013
@@ -25,10 +25,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -45,6 +50,9 @@ public class LineRecordReader implements
private long end;
private LineReader in;
int maxLineLength;
+ private Seekable filePosition;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
/**
* A class that provides a line reader from an input stream.
@@ -71,37 +79,69 @@ public class LineRecordReader implements
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
+ codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
- boolean skipFirstLine = false;
- if (codec != null) {
- in = new LineReader(codec.createInputStream(fileIn), job);
- end = Long.MAX_VALUE;
- } else {
- if (start != 0) {
- skipFirstLine = true;
- --start;
- fileIn.seek(start);
+
+ if (isCompressedInput()) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ final SplitCompressionInputStream cIn =
+ ((SplittableCompressionCodec)codec).createInputStream(
+ fileIn, decompressor, start, end,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+ in = new LineReader(cIn, job);
+ start = cIn.getAdjustedStart();
+ end = cIn.getAdjustedEnd();
+ filePosition = cIn; // take pos from compressed stream
+ } else {
+ in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+ filePosition = fileIn;
}
+ } else {
+ fileIn.seek(start);
in = new LineReader(fileIn, job);
+ filePosition = fileIn;
}
- if (skipFirstLine) { // skip first line and re-establish "start".
- start += in.readLine(new Text(), 0,
- (int)Math.min((long)Integer.MAX_VALUE, end - start));
+ // If this is not the first split, we always throw away first record
+ // because we always (except the last split) read one extra line in
+ // next() method.
+ if (start != 0) {
+ start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
-
+
+ private boolean isCompressedInput() {
+ return (codec != null);
+ }
+
+ private int maxBytesToConsume(long pos) {
+ return isCompressedInput()
+ ? Integer.MAX_VALUE
+ : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ }
+
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompressedInput() && null != filePosition) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
public LineRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineLength;
this.in = new LineReader(in);
this.start = offset;
this.pos = offset;
- this.end = endOffset;
+ this.end = endOffset;
+ this.filePosition = null;
}
public LineRecordReader(InputStream in, long offset, long endOffset,
@@ -113,6 +153,7 @@ public class LineRecordReader implements
this.start = offset;
this.pos = offset;
this.end = endOffset;
+ this.filePosition = null;
}
public LongWritable createKey() {
@@ -127,12 +168,13 @@ public class LineRecordReader implements
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
- while (pos < end) {
+ // We always read one extra line, which lies outside the upper
+ // split limit i.e. (end - 1)
+ while (getFilePosition() <= end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
- maxLineLength));
+ Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
return false;
}
@@ -151,21 +193,28 @@ public class LineRecordReader implements
/**
* Get the progress within the split
*/
- public float getProgress() {
+ public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
- return Math.min(1.0f, (pos - start) / (float)(end - start));
+ return Math.min(1.0f,
+ (getFilePosition() - start) / (float)(end - start));
}
}
- public synchronized long getPos() throws IOException {
+ public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
- if (in != null) {
- in.close();
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
}
}
}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Jun 21 06:37:27 2013
@@ -296,32 +296,51 @@ class LinuxTaskController extends TaskCo
@Override
public void deleteAsUser(String user, String subDir) throws IOException {
- String[] command =
- new String[]{taskControllerExe,
- user,
- localStorage.getDirsString(),
- Integer.toString(Commands.DELETE_AS_USER.getValue()),
- subDir};
- ShellCommandExecutor shExec = new ShellCommandExecutor(command);
- if (LOG.isDebugEnabled()) {
- LOG.debug("deleteAsUser: " + Arrays.toString(command));
+ String[] command = new String[] { taskControllerExe, user,
+ localStorage.getDirsString(),
+ Integer.toString(Commands.DELETE_AS_USER.getValue()), subDir };
+ ShellCommandExecutor shExec = null;
+ try {
+ shExec = new ShellCommandExecutor(command);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleteAsUser: " + Arrays.toString(command));
+ }
+ shExec.execute();
+ } catch (IOException e) {
+ if (shExec != null) {
+ int exitCode = shExec.getExitCode();
+ LOG.info("deleteAsUser: " + Arrays.toString(command));
+ LOG.warn("Exit code is : " + exitCode);
+ LOG.info("Output from deleteAsUser LinuxTaskController:");
+ logOutput(shExec.getOutput());
+ }
+ throw e;
}
- shExec.execute();
}
@Override
public void deleteLogAsUser(String user, String subDir) throws IOException {
- String[] command =
- new String[]{taskControllerExe,
- user,
- localStorage.getDirsString(),
- Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
- subDir};
- ShellCommandExecutor shExec = new ShellCommandExecutor(command);
- if (LOG.isDebugEnabled()) {
- LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+ String[] command = new String[] { taskControllerExe, user,
+ localStorage.getDirsString(),
+ Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()), subDir };
+ ShellCommandExecutor shExec = null;
+ try {
+ shExec = new ShellCommandExecutor(command);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+ }
+ shExec.execute();
+ } catch (IOException e) {
+ if (shExec != null) {
+ int exitCode = shExec.getExitCode();
+ LOG.info("deleteLogAsUser: " + Arrays.toString(command));
+ LOG.warn("Exit code is : " + exitCode);
+ LOG.info("Output from deleteLogAsUser LinuxTaskController:");
+ logOutput(shExec.getOutput());
+ }
+ throw e;
}
- shExec.execute();
}
@Override