You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [25/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Jun 21 06:37:27 2013
@@ -44,17 +44,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.Counters.CountersExceededException;
-import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -71,7 +67,7 @@ import org.apache.hadoop.util.Shell;
/*************************************************************
* JobInProgress maintains all the info for keeping
- * a Job on the straight and narrow. It keeps its JobProfile
+ * a Job on the straight and narrow. It keeps its JobProfile
* and its latest JobStatus, plus a set of tables for
* doing bookkeeping of its Tasks.
* ***********************************************************
@@ -256,7 +252,6 @@ public class JobInProgress {
private String submitHostAddress;
private String user;
private String historyFile = "";
- private boolean historyFileCopied;
// Per-job counters
public static enum Counter {
@@ -266,6 +261,7 @@ public class JobInProgress {
TOTAL_LAUNCHED_REDUCES,
OTHER_LOCAL_MAPS,
DATA_LOCAL_MAPS,
+ NODEGROUP_LOCAL_MAPS,
RACK_LOCAL_MAPS,
SLOTS_MILLIS_MAPS,
SLOTS_MILLIS_REDUCES,
@@ -520,7 +516,7 @@ public class JobInProgress {
public void cleanUpMetrics() {
// per job metrics is disabled for now.
}
-
+
private void printCache (Map<Node, List<TaskInProgress>> cache) {
LOG.info("The taskcache info:");
for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
@@ -656,12 +652,44 @@ public class JobInProgress {
this.numSlotsPerReduce = numSlotsPerReduce;
}
+ static final String JOB_INIT_EXCEPTION =
+ "mapreduce.job.init.throw.exception";
+ static final String JT_JOB_INIT_EXCEPTION_OVERRIDE =
+ "mapreduce.jt.job.init.throw.exception.override";
+
+ Object jobInitWaitLockForTests = new Object();
+
+ void signalInitWaitLockForTests() {
+ synchronized (jobInitWaitLockForTests) {
+ jobInitWaitLockForTests.notify();
+ }
+ }
+
+ void waitForInitWaitLockForTests() {
+ synchronized (jobInitWaitLockForTests) {
+ try {
+ LOG.info("About to wait for jobInitWaitLockForTests");
+ jobInitWaitLockForTests.wait();
+ LOG.info("Done waiting for jobInitWaitLockForTests");
+ } catch (InterruptedException ie) {
+ // Should never occur
+ }
+ }
+ }
+
/**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
public synchronized void initTasks()
throws IOException, KillInterruptedException, UnknownHostException {
+ // Only for tests
+ if (!jobtracker.getConf().getBoolean(JT_JOB_INIT_EXCEPTION_OVERRIDE, false)
+ &&
+ getJobConf().getBoolean(JOB_INIT_EXCEPTION, false)) {
+ waitForInitWaitLockForTests();
+ }
+
if (tasksInited || isComplete()) {
return;
}
@@ -692,18 +720,13 @@ public class JobInProgress {
setPriority(this.priority);
//
- // generate security keys needed by Tasks
- //
- generateAndStoreTokens();
-
- //
// read input splits and create a map per a split
//
TaskSplitMetaInfo[] splits = createSplits(jobId);
if (numMapTasks != splits.length) {
throw new IOException("Number of maps in JobConf doesn't match number of " +
- "recieved splits for job " + jobId + "! " +
- "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
+ "recieved splits for job " + jobId + "! " +
+ "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
}
numMapTasks = splits.length;
@@ -1311,27 +1334,49 @@ public class JobInProgress {
int clusterSize,
int numUniqueHosts
) throws IOException {
- if (status.getRunState() != JobStatus.RUNNING) {
+ return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts,
+ anyCacheLevel);
+ }
+
+ /**
+ * Return a MapTask with locality level that smaller or equal than a given
+ * locality level to tasktracker.
+ *
+ * @param tts The task tracker that is asking for a task
+ * @param clusterSize The number of task trackers in the cluster
+ * @param numUniqueHosts The number of hosts that run task trackers
+ * @param avgProgress The average progress of this kind of task in this job
+ * @param maxCacheLevel The maximum topology level until which to schedule
+ * maps.
+ * @return the index in tasks of the selected task (or -1 for no task)
+ * @throws IOException
+ */
+ public synchronized Task obtainNewMapTaskCommon(
+ TaskTrackerStatus tts, int clusterSize, int numUniqueHosts,
+ int maxCacheLevel) throws IOException {
+ if (!tasksInited) {
LOG.info("Cannot create task split for " + profile.getJobID());
try { throw new IOException("state = " + status.getRunState()); }
catch (IOException ioe) {ioe.printStackTrace();}
return null;
}
-
- int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
+
+ int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel,
status.mapProgress());
if (target == -1) {
return null;
}
-
+
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
- resetSchedulingOpportunities();
+ // DO NOT reset for off-switch!
+ if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
+ resetSchedulingOpportunities();
+ }
}
-
return result;
- }
+ }
/*
* Return task cleanup attempt if any, to run on a given tracker
@@ -1374,78 +1419,22 @@ public class JobInProgress {
public synchronized Task obtainNewNodeLocalMapTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts)
- throws IOException {
- if (!tasksInited) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- try { throw new IOException("state = " + status.getRunState()); }
- catch (IOException ioe) {ioe.printStackTrace();}
- return null;
- }
-
- int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 1,
- status.mapProgress());
- if (target == -1) {
- return null;
- }
-
- Task result = maps[target].getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
- resetSchedulingOpportunities();
- }
-
- return result;
+ throws IOException {
+ return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 1);
}
public synchronized Task obtainNewNodeOrRackLocalMapTask(
TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
throws IOException {
- if (!tasksInited) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- try { throw new IOException("state = " + status.getRunState()); }
- catch (IOException ioe) {ioe.printStackTrace();}
- return null;
- }
-
- int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel,
- status.mapProgress());
- if (target == -1) {
- return null;
- }
-
- Task result = maps[target].getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
- resetSchedulingOpportunities();
- }
-
- return result;
+ return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel);
}
public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts)
- throws IOException {
- if (!tasksInited) {
- LOG.info("Cannot create task split for " + profile.getJobID());
- try { throw new IOException("state = " + status.getRunState()); }
- catch (IOException ioe) {ioe.printStackTrace();}
- return null;
- }
-
- int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
- NON_LOCAL_CACHE_LEVEL, status.mapProgress());
- if (target == -1) {
- return null;
- }
-
- Task result = maps[target].getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
- // DO NOT reset for off-switch!
- }
-
- return result;
+ throws IOException {
+ return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts,
+ NON_LOCAL_CACHE_LEVEL);
}
public void schedulingOpportunity() {
@@ -1620,7 +1609,7 @@ public class JobInProgress {
}
public synchronized boolean scheduleReduces() {
- return finishedMapTasks >= completedMapsForReduceSlowstart;
+ return finishedMapTasks + failedMapTIPs >= completedMapsForReduceSlowstart;
}
/**
@@ -1693,16 +1682,37 @@ public class JobInProgress {
// returns the (cache)level at which the nodes matches
private int getMatchingLevelForNodes(Node n1, Node n2) {
+ return getMatchingLevelForNodes(n1, n2, this.maxLevel);
+ }
+
+ static int getMatchingLevelForNodes(Node n1, Node n2, int maxLevel) {
int count = 0;
+
+ // In the case that the two nodes are at different levels in the
+ // node heirarchy, walk upwards on the deeper one until the
+ // levels are equal. Each of these counts as "distance" since it
+ // assumedly is going through another rack.
+ int level1 = n1.getLevel(), level2 = n2.getLevel();
+ while (n1 != null && level1 > level2) {
+ n1 = n1.getParent();
+ level1--;
+ count++;
+ }
+ while (n2 != null && level2 > level1) {
+ n2 = n2.getParent();
+ level2--;
+ count++;
+ }
+
do {
- if (n1.equals(n2)) {
- return count;
+ if (n1.equals(n2) || count >= maxLevel) {
+ return Math.min(count, maxLevel);
}
++count;
n1 = n1.getParent();
n2 = n2.getParent();
} while (n1 != null);
- return this.maxLevel;
+ return maxLevel;
}
/**
@@ -1729,7 +1739,7 @@ public class JobInProgress {
// keeping the earlier ordering intact
String name;
String splits = "";
- Enum counter = null;
+ Enum<Counter> counter = null;
if (tip.isJobSetupTask()) {
launchedSetup = true;
name = Values.SETUP.name();
@@ -1776,6 +1786,7 @@ public class JobInProgress {
//
// So to simplify, increment the data locality counter whenever there is
// data locality.
+ Locality locality = Locality.OFF_SWITCH;
if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
// increment the data locality counter for maps
Node tracker = jobtracker.getNode(tts.getHost());
@@ -1795,26 +1806,71 @@ public class JobInProgress {
}
}
}
- switch (level) {
- case 0 :
- LOG.info("Choosing data-local task " + tip.getTIPId());
- jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
- break;
+ locality = logAndIncreJobCounters(tip, level, jobtracker.isNodeGroupAware());
+ }
+ // Set locality
+ tip.setTaskAttemptLocality(id, locality);
+
+ // Set avataar
+ Avataar avataar = (tip.getActiveTasks().size() > 1) ? Avataar.SPECULATIVE :
+ Avataar.VIRGIN;
+ tip.setTaskAttemptAvataar(id, avataar);
+ }
+
+ private Locality logAndIncreJobCounters(TaskInProgress tip, int level,
+ boolean isNodeGroupAware) {
+ switch (level) {
+ case 0:
+ // level 0 means data-local
+ logAndIncrDataLocalMaps(tip);
+ return Locality.NODE_LOCAL;
case 1:
- LOG.info("Choosing rack-local task " + tip.getTIPId());
- jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
- break;
- default :
+ if (isNodeGroupAware) {
+ // level 1 in case of with-NodeGroup means nodegroup-local
+ logAndIncrNodeGroupLocalMaps(tip);
+ return Locality.GROUP_LOCAL;
+ } else {
+ // level 1 in case of without-NodeGroup means rack-local
+ logAndIncrRackLocalMaps(tip);
+ return Locality.RACK_LOCAL;
+ }
+ case 2:
+ if (isNodeGroupAware) {
+ // level 2 in case of with-NodeGroup means rack-local
+ logAndIncrRackLocalMaps(tip);
+ return Locality.RACK_LOCAL;
+ }
+ // in case of without-NodeGroup, level 2 falls through to other-local
+ // handled by default
+ default:
// check if there is any locality
if (level != this.maxLevel) {
- LOG.info("Choosing cached task at level " + level + tip.getTIPId());
- jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+ logAndIncrOtherLocalMaps(tip, level);
}
- break;
- }
+ return Locality.OFF_SWITCH;
}
}
+ private void logAndIncrOtherLocalMaps(TaskInProgress tip, int level) {
+ LOG.info("Choosing cached task at level " + level + tip.getTIPId());
+ jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+ }
+
+ private void logAndIncrNodeGroupLocalMaps(TaskInProgress tip) {
+ LOG.info("Choosing nodeGroup-local task " + tip.getTIPId());
+ jobCounters.incrCounter(Counter.NODEGROUP_LOCAL_MAPS, 1);
+ }
+
+ private void logAndIncrRackLocalMaps(TaskInProgress tip) {
+ LOG.info("Choosing rack-local task " + tip.getTIPId());
+ jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+ }
+
+ private void logAndIncrDataLocalMaps(TaskInProgress tip) {
+ LOG.info("Choosing data-local task " + tip.getTIPId());
+ jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+ }
+
void setFirstTaskLaunchTime(TaskInProgress tip) {
TaskType key = tip.getFirstTaskType();
@@ -2194,21 +2250,19 @@ public class JobInProgress {
continue;
}
- if (!tip.hasRunOnMachine(ttStatus.getHost(),
+ if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
+ // Check if this tip can be removed from the list.
+ // If the list is shared then we should not remove.
+ if(shouldRemove){
+ iter.remove();
+ }
+ if (!tip.hasRunOnMachine(ttStatus.getHost(),
ttStatus.getTrackerName())) {
- if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
- // In case of shared list we don't remove it. Since the TIP failed
- // on this tracker can be scheduled on some other tracker.
- if (shouldRemove) {
- iter.remove(); //this tracker is never going to run it again
- }
return tip;
- }
+ }
} else {
- // Check if this tip can be removed from the list.
- // If the list is shared then we should not remove.
- if (shouldRemove) {
- // This tracker will never speculate this tip
+ if (shouldRemove && tip.hasRunOnMachine(ttStatus.getHost(),
+ ttStatus.getTrackerName())) {
iter.remove();
}
}
@@ -2572,22 +2626,27 @@ public class JobInProgress {
this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
String taskType = getTaskType(tip);
+ TaskAttemptID taskAttemptId = status.getTaskID();
+ Locality locality = checkLocality(tip, taskAttemptId);
+ Avataar avataar = checkAvataar(tip, taskAttemptId);
if (status.getIsMap()){
- JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
+ JobHistory.MapAttempt.logStarted(taskAttemptId, status.getStartTime(),
status.getTaskTracker(),
- ttStatus.getHttpPort(),
- taskType);
- JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(),
+ ttStatus.getHttpPort(),
+ taskType,locality, avataar);
+ JobHistory.MapAttempt.logFinished(taskAttemptId, status.getFinishTime(),
trackerHostname, taskType,
status.getStateString(),
status.getCounters());
}else{
- JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(),
+ JobHistory.ReduceAttempt.logStarted(taskAttemptId, status.getStartTime(),
status.getTaskTracker(),
ttStatus.getHttpPort(),
- taskType);
- JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
- status.getSortFinishTime(), status.getFinishTime(),
+ taskType, locality, avataar);
+ JobHistory.ReduceAttempt.logFinished(taskAttemptId,
+ status.getShuffleFinishTime(),
+ status.getSortFinishTime(),
+ status.getFinishTime(),
trackerHostname,
taskType,
status.getStateString(),
@@ -3032,9 +3091,12 @@ public class JobInProgress {
String diagInfo = taskDiagnosticInfo == null ? "" :
StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
String taskType = getTaskType(tip);
+ TaskAttemptID taskAttemptId = status.getTaskID();
+ Locality locality = checkLocality(tip, taskAttemptId);
+ Avataar avataar = checkAvataar(tip, taskAttemptId);
if (taskStatus.getIsMap()) {
JobHistory.MapAttempt.logStarted(taskid, startTime,
- taskTrackerName, taskTrackerPort, taskType);
+ taskTrackerName, taskTrackerPort, taskType, locality, avataar);
if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
JobHistory.MapAttempt.logFailed(taskid, finishTime,
taskTrackerHostName, diagInfo, taskType);
@@ -3044,7 +3106,7 @@ public class JobInProgress {
}
} else {
JobHistory.ReduceAttempt.logStarted(taskid, startTime,
- taskTrackerName, taskTrackerPort, taskType);
+ taskTrackerName, taskTrackerPort, taskType, locality, avataar);
if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
taskTrackerHostName, diagInfo, taskType);
@@ -3140,6 +3202,22 @@ public class JobInProgress {
}
}
+ private Locality checkLocality(TaskInProgress tip, TaskAttemptID taskAttemptId) {
+ Locality locality = tip.getTaskAttemptLocality(taskAttemptId);
+ if (locality == null) {
+ locality = Locality.OFF_SWITCH;
+ }
+ return locality;
+ }
+
+ private Avataar checkAvataar(TaskInProgress tip, TaskAttemptID taskAttemptId) {
+ Avataar avataar = tip.getTaskAttemptAvataar(taskAttemptId);
+ if (avataar == null) {
+ avataar = Avataar.VIRGIN;
+ }
+ return avataar;
+ }
+
void killSetupTip(boolean isMap) {
if (isMap) {
setup[0].kill();
@@ -3221,6 +3299,16 @@ public class JobInProgress {
jobtracker.storeCompletedJob(this);
jobtracker.finalizeJob(this);
+ }
+ cleanupJob();
+ }
+
+ /**
+ * The job is dead. We're now cleaning it, getting rid of job directories and
+ * removing all delegation token etc.
+ */
+ void cleanupJob() {
+ synchronized (this) {
try {
// Definitely remove the local-disk copy of the job file
if (localJobFile != null) {
@@ -3230,7 +3318,17 @@ public class JobInProgress {
Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
CleanupQueue.getInstance().addToQueue(
- new PathDeletionContext(tempDir, conf));
+ new PathDeletionContext(tempDir, conf));
+
+ // delete the staging area for the job and cancel delegation token
+ String jobTempDir = conf.get("mapreduce.job.dir");
+ if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
+ !conf.getKeepFailedTaskFiles()) {
+ Path jobTempDirPath = new Path(jobTempDir);
+ CleanupQueue.getInstance().addToQueue(
+ new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
+ }
+
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
@@ -3244,11 +3342,6 @@ public class JobInProgress {
this.runningReduces = null;
}
- // remove jobs delegation tokens
- if(conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
- DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
- } // else don't remove it.May be used by spawned tasks
-
//close the user's FS
try {
fs.close();
@@ -3516,31 +3609,6 @@ public class JobInProgress {
}
/**
- * generate job token and save it into the file
- * @throws IOException
- */
- private void generateAndStoreTokens() throws IOException {
- Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
- Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
- if (tokenStorage == null) {
- tokenStorage = new Credentials();
- }
- //create JobToken file and write token to it
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
- .toString()));
- Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
- jobtracker.getJobTokenSecretManager());
- token.setService(identifier.getJobId());
-
- TokenCache.setJobToken(token, tokenStorage);
-
- // write TokenStorage out
- tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
- LOG.info("jobToken generated and stored with users keys in "
- + keysFile.toUri().getPath());
- }
-
- /**
* Get the level of locality that a given task would have if launched on
* a particular TaskTracker. Returns 0 if the task has data on that machine,
* 1 if it has data on the same rack, etc (depending on number of levels in
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
# ResourceBundle properties file for job-level counters
CounterGroupName= Job Counters
@@ -9,6 +22,7 @@ TOTAL_LAUNCHED_REDUCES.name= Launc
OTHER_LOCAL_MAPS.name= Other local map tasks
DATA_LOCAL_MAPS.name= Data-local map tasks
RACK_LOCAL_MAPS.name= Rack-local map tasks
+NODEGROUP_LOCAL_MAPS.name= NodeGroup-local map tasks
FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms)
FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms)
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java Fri Jun 21 06:37:27 2013
@@ -82,7 +82,7 @@ public class JobLocalizer {
private final FileSystem lfs;
private final List<Path> localDirs;
private final LocalDirAllocator lDirAlloc;
- private final JobConf ttConf;
+ protected final JobConf ttConf;
private final String JOBDIR;
private final String DISTDIR;
@@ -90,7 +90,7 @@ public class JobLocalizer {
private final String JARDST;
private final String JOBCONF;
private final String JOBTOKEN;
- private static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
+ protected static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
public JobLocalizer(JobConf ttConf, String user, String jobid)
throws IOException {
@@ -108,10 +108,10 @@ public class JobLocalizer {
throw new IOException("Cannot initialize for null jobid");
}
this.jobid = jobid;
- this.ttConf = ttConf;
- lfs = FileSystem.getLocal(ttConf).getRaw();
+ this.ttConf = new JobConf(ttConf);
+ lfs = FileSystem.getLocal(this.ttConf).getRaw();
this.localDirs = createPaths(user, localDirs);
- ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
+ this.ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
Collections.shuffle(this.localDirs);
lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);
JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Jun 21 06:37:27 2013
@@ -80,6 +80,12 @@ class JobQueueTaskScheduler extends Task
@Override
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
+ // Check for JT safe-mode
+ if (taskTrackerManager.isInSafeMode()) {
+ LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+ return null;
+ }
+
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
@@ -166,7 +172,8 @@ class JobQueueTaskScheduler extends Task
Task t = null;
- // Try to schedule a node-local or rack-local Map task
+ // Try to schedule a Map task with locality between node-local
+ // and rack-local
t =
job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,
numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());