You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:42 UTC
svn commit: r1399950 [2/11] - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/
dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduc...
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Fri Oct 19 02:25:55 2012
@@ -99,8 +99,8 @@ public class JobHistoryEventHandler exte
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
- // Has a signal (SIGTERM etc) been issued?
- protected volatile boolean isSignalled = false;
+ // should job completion be force when the AM shuts down?
+ protected volatile boolean forceJobCompletion = false;
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
@@ -322,7 +322,7 @@ public class JobHistoryEventHandler exte
// Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
// closed their event writers
Iterator<JobId> jobIt = fileMap.keySet().iterator();
- if(isSignalled) {
+ if(forceJobCompletion) {
while (jobIt.hasNext()) {
JobId toClose = jobIt.next();
MetaInfo mi = fileMap.get(toClose);
@@ -661,6 +661,8 @@ public class JobHistoryEventHandler exte
summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
summaryFileOut.close();
+ doneDirFS.setPermission(qualifiedSummaryDoneFile, new FsPermission(
+ JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
} catch (IOException e) {
LOG.info("Unable to write out JobSummaryInfo to ["
+ qualifiedSummaryDoneFile + "]", e);
@@ -894,7 +896,7 @@ public class JobHistoryEventHandler exte
stagingDirFS.delete(fromPath, false);
}
- }
+ }
boolean pathExists(FileSystem fileSys, Path path) throws IOException {
return fileSys.exists(path);
@@ -909,9 +911,9 @@ public class JobHistoryEventHandler exte
return tmpFileName.substring(0, tmpFileName.length()-4);
}
- public void setSignalled(boolean isSignalled) {
- this.isSignalled = isSignalled;
- LOG.info("JobHistoryEventHandler notified that isSignalled was "
- + isSignalled);
+ public void setForcejobCompletion(boolean forceJobCompletion) {
+ this.forceJobCompletion = forceJobCompletion;
+ LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ + forceJobCompletion);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Oct 19 02:25:55 2012
@@ -87,8 +87,6 @@ import org.apache.hadoop.mapreduce.v2.ut
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.Clock;
@@ -172,6 +170,8 @@ public class MRAppMaster extends Composi
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
+ private volatile boolean isLastAMRetry = false;
+
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) {
@@ -197,11 +197,21 @@ public class MRAppMaster extends Composi
@Override
public void init(final Configuration conf) {
-
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
-
+
+ //TODO this is a hack, we really need the RM to inform us when we
+ // are the last one. This would allow us to configure retries on
+ // a per application basis.
+ int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
+ isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
+ LOG.info("AM Retries: " + numAMRetries +
+ " attempt num: " + appAttemptID.getAttemptId() +
+ " is last retry: " + isLastAMRetry);
+
+
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
@@ -419,6 +429,8 @@ public class MRAppMaster extends Composi
}
try {
+ //We are finishing cleanly so this is the last retry
+ isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
@@ -478,27 +490,17 @@ public class MRAppMaster extends Composi
try {
this.currentUser = UserGroupInformation.getCurrentUser();
- if (UserGroupInformation.isSecurityEnabled()) {
- // Read the file-system tokens from the localized tokens-file.
- Path jobSubmitDir =
- FileContext.getLocalFSFileContext().makeQualified(
- new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
- .getAbsolutePath()));
- Path jobTokenFile =
- new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
- fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
- LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
- + jobTokenFile);
-
- for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Token of kind " + tk.getKind()
- + "in current ugi in the AppMaster for service "
- + tk.getService());
- }
- currentUser.addToken(tk); // For use by AppMaster itself.
- }
- }
+ // Read the file-system tokens from the localized tokens-file.
+ Path jobSubmitDir =
+ FileContext.getLocalFSFileContext().makeQualified(
+ new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
+ .getAbsolutePath()));
+ Path jobTokenFile =
+ new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+ fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+ LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ + jobTokenFile);
+ currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
} catch (IOException e) {
throw new YarnException(e);
}
@@ -676,7 +678,11 @@ public class MRAppMaster extends Composi
}
public void setSignalled(boolean isSignalled) {
- ((RMCommunicator) containerAllocator).setSignalled(true);
+ ((RMCommunicator) containerAllocator).setSignalled(isSignalled);
+ }
+
+ public void setShouldUnregister(boolean shouldUnregister) {
+ ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
}
}
@@ -727,7 +733,12 @@ public class MRAppMaster extends Composi
@Override
public synchronized void stop() {
try {
- cleanupStagingDir();
+ if(isLastAMRetry) {
+ cleanupStagingDir();
+ } else {
+ LOG.info("Skipping cleaning up the staging dir. "
+ + "assuming AM will be retried.");
+ }
} catch (IOException io) {
LOG.error("Failed to cleanup staging dir: ", io);
}
@@ -1026,14 +1037,19 @@ public class MRAppMaster extends Composi
public void run() {
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler.");
+
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so
// that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setSignalled(true);
+ ((ContainerAllocatorRouter) appMaster.containerAllocator)
+ .setShouldUnregister(appMaster.isLastAMRetry);
}
+
if(appMaster.jobHistoryEventHandler != null) {
- appMaster.jobHistoryEventHandler.setSignalled(true);
+ appMaster.jobHistoryEventHandler
+ .setForcejobCompletion(appMaster.isLastAMRetry);
}
appMaster.stop();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Fri Oct 19 02:25:55 2012
@@ -82,8 +82,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@@ -115,16 +114,15 @@ public class MRClientService extends Abs
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress address = new InetSocketAddress(0);
- ClientToAMSecretManager secretManager = null;
+ ClientToAMTokenSecretManager secretManager = null;
if (UserGroupInformation.isSecurityEnabled()) {
- secretManager = new ClientToAMSecretManager();
String secretKeyStr =
System
.getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
byte[] bytes = Base64.decodeBase64(secretKeyStr);
- ClientTokenIdentifier identifier = new ClientTokenIdentifier(
- this.appContext.getApplicationID());
- secretManager.setMasterKey(identifier, bytes);
+ secretManager =
+ new ClientToAMTokenSecretManager(this.appContext.getApplicationID(),
+ bytes);
}
server =
rpc.getServer(MRClientProtocol.class, protocolHandler, address,
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Oct 19 02:25:55 2012
@@ -582,17 +582,23 @@ public class JobImpl implements org.apac
String jobFile =
remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
+ StringBuilder diagsb = new StringBuilder();
+ for (String s : getDiagnostics()) {
+ diagsb.append(s).append("\n");
+ }
+
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
- cleanupProgress, jobFile, amInfos, isUber);
+ cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
}
computeProgress();
- return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
- appSubmitTime, startTime, finishTime, setupProgress,
+ JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username,
+ state, appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
- cleanupProgress, jobFile, amInfos, isUber);
+ cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
+ return report;
} finally {
readLock.unlock();
}
@@ -759,7 +765,8 @@ public class JobImpl implements org.apac
job.getCommitter().commitJob(job.getJobContext());
} catch (IOException e) {
LOG.error("Could not do commit for Job", e);
- job.logJobHistoryFinishedEvent();
+ job.addDiagnostic("Job commit failed: " + e.getMessage());
+ job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
return job.finished(JobState.FAILED);
}
job.logJobHistoryFinishedEvent();
@@ -1199,7 +1206,7 @@ public class JobImpl implements org.apac
}
}
- private void abortJob(
+ protected void abortJob(
org.apache.hadoop.mapreduce.JobStatus.State finalState) {
try {
committer.abortJob(jobContext, finalState);
@@ -1370,7 +1377,8 @@ public class JobImpl implements org.apac
}
}
- float failureRate = (float) fetchFailures / runningReduceTasks;
+ float failureRate = runningReduceTasks == 0 ? 1.0f :
+ (float) fetchFailures / runningReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
@@ -1500,7 +1508,7 @@ public class JobImpl implements org.apac
}
}
- private void addDiagnostic(String diag) {
+ protected void addDiagnostic(String diag) {
diagnostics.add(diag);
}
@@ -1561,7 +1569,7 @@ public class JobImpl implements org.apac
Path confPath = getConfFile();
FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
Configuration jobConf = new Configuration(false);
- jobConf.addResource(fc.open(confPath));
+ jobConf.addResource(fc.open(confPath), confPath.toString());
return jobConf;
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Oct 19 02:25:55 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapred.Task;
@@ -71,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -86,6 +88,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@@ -120,6 +123,7 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -128,6 +132,8 @@ import org.apache.hadoop.yarn.util.Build
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
+import com.google.common.base.Preconditions;
+
/**
* Implementation of TaskAttempt interface.
*/
@@ -404,10 +410,10 @@ public abstract class TaskAttemptImpl im
TaskAttemptState.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition())
- .addTransition(
- TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED,
- TaskAttemptEventType.TA_KILL,
- new KilledAfterSuccessTransition())
+ .addTransition(TaskAttemptState.SUCCEEDED,
+ EnumSet.of(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED),
+ TaskAttemptEventType.TA_KILL,
+ new KilledAfterSuccessTransition())
.addTransition(
TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
@@ -435,7 +441,8 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
- TaskAttemptEventType.TA_FAILMSG))
+ TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
// Transitions from KILLED state
.addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
@@ -604,10 +611,12 @@ public abstract class TaskAttemptImpl im
if (jobJar != null) {
Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
.getUri(), remoteFS.getWorkingDirectory());
- localResources.put(
- MRJobConfig.JOB_JAR,
- createLocalResource(remoteFS, remoteJobJar,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ LocalResource rc = createLocalResource(remoteFS, remoteJobJar,
+ LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
+ String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+ rc.setPattern(pattern);
+ localResources.put(MRJobConfig.JOB_JAR, rc);
LOG.info("The job-jar file on the remote FS is "
+ remoteJobJar.toUri().toASCIIString());
} else {
@@ -638,14 +647,10 @@ public abstract class TaskAttemptImpl im
MRApps.setupDistributedCache(conf, localResources);
// Setup up task credentials buffer
- Credentials taskCredentials = new Credentials();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Adding #" + credentials.numberOfTokens()
- + " tokens and #" + credentials.numberOfSecretKeys()
- + " secret keys for NM use for launching container");
- taskCredentials.addAll(credentials);
- }
+ LOG.info("Adding #" + credentials.numberOfTokens()
+ + " tokens and #" + credentials.numberOfSecretKeys()
+ + " secret keys for NM use for launching container");
+ Credentials taskCredentials = new Credentials(credentials);
// LocalStorageToken is needed irrespective of whether security is enabled
// or not.
@@ -1482,6 +1487,9 @@ public abstract class TaskAttemptImpl im
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+ // too many fetch failure can only happen for map tasks
+ Preconditions
+ .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
//add to diagnostic
taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
//set the finish time
@@ -1505,15 +1513,30 @@ public abstract class TaskAttemptImpl im
}
private static class KilledAfterSuccessTransition implements
- SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptState> {
@SuppressWarnings("unchecked")
@Override
- public void transition(TaskAttemptImpl taskAttempt,
+ public TaskAttemptState transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
- TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
- //add to diagnostic
- taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
+ if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
+ // after a reduce task has succeeded, its outputs are in safe in HDFS.
+ // logically such a task should not be killed. we only come here when
+ // there is a race condition in the event queue. E.g. some logic sends
+ // a kill request to this attempt when the successful completion event
+ // for this task is already in the event queue. so the kill event will
+ // get executed immediately after the attempt is marked successful and
+ // result in this transition being exercised.
+ // ignore this for reduce tasks
+ LOG.info("Ignoring killed event for successful reduce task attempt" +
+ taskAttempt.getID().toString());
+ return TaskAttemptState.SUCCEEDED;
+ }
+ if(event instanceof TaskAttemptKillEvent) {
+ TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
+ //add to diagnostic
+ taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
+ }
// not setting a finish time since it was set on success
assert (taskAttempt.getFinishTime() != 0);
@@ -1527,6 +1550,7 @@ public abstract class TaskAttemptImpl im
.getTaskId().getJobId(), tauce));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
+ return TaskAttemptState.KILLED;
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Oct 19 02:25:55 2012
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -108,7 +110,8 @@ public abstract class TaskImpl implement
private long scheduledTime;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
+
+ protected boolean encryptedShuffle;
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
@@ -188,12 +191,12 @@ public abstract class TaskImpl implement
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
- .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+ .addTransition(TaskState.SUCCEEDED,
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
- TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
- .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+ TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition())
+ .addTransition(TaskState.SUCCEEDED,
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
- TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
+ TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
// Ignore-able transitions.
.addTransition(
TaskState.SUCCEEDED, TaskState.SUCCEEDED,
@@ -274,6 +277,8 @@ public abstract class TaskImpl implement
this.jobToken = jobToken;
this.metrics = metrics;
this.appContext = appContext;
+ this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
// See if this is from a previous generation.
if (completedTasksFromPreviousRun != null
@@ -637,9 +642,10 @@ public abstract class TaskImpl implement
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
- tce.setMapOutputServerAddress("http://"
- + attempt.getNodeHttpAddress().split(":")[0] + ":"
- + attempt.getShufflePort());
+ String scheme = (encryptedShuffle) ? "https://" : "http://";
+ tce.setMapOutputServerAddress(scheme
+ + attempt.getNodeHttpAddress().split(":")[0] + ":"
+ + attempt.getShufflePort());
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;
@@ -891,7 +897,7 @@ public abstract class TaskImpl implement
}
}
- private static class MapRetroactiveFailureTransition
+ private static class RetroactiveFailureTransition
extends AttemptFailedTransition {
@Override
@@ -905,8 +911,8 @@ public abstract class TaskImpl implement
return TaskState.SUCCEEDED;
}
}
-
- //verify that this occurs only for map task
+
+ // a successful REDUCE task should not be overridden
//TODO: consider moving it to MapTaskImpl
if (!TaskType.MAP.equals(task.getType())) {
LOG.error("Unexpected event for REDUCE task " + event.getType());
@@ -932,42 +938,46 @@ public abstract class TaskImpl implement
}
}
- private static class MapRetroactiveKilledTransition implements
+ private static class RetroactiveKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
@Override
public TaskState transition(TaskImpl task, TaskEvent event) {
- // verify that this occurs only for map task
+ TaskAttemptId attemptId = null;
+ if (event instanceof TaskTAttemptEvent) {
+ TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+ attemptId = castEvent.getTaskAttemptID();
+ if (task.getState() == TaskState.SUCCEEDED &&
+ !attemptId.equals(task.successfulAttempt)) {
+ // don't allow a different task attempt to override a previous
+ // succeeded state
+ return TaskState.SUCCEEDED;
+ }
+ }
+
+ // a successful REDUCE task should not be overridden
// TODO: consider moving it to MapTaskImpl
if (!TaskType.MAP.equals(task.getType())) {
LOG.error("Unexpected event for REDUCE task " + event.getType());
task.internalError(event.getType());
}
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- TaskAttemptId attemptId = attemptEvent.getTaskAttemptID();
- if(task.successfulAttempt == attemptId) {
- // successful attempt is now killed. reschedule
- // tell the job about the rescheduling
- unSucceed(task);
- task.handleTaskAttemptCompletion(
- attemptId,
- TaskAttemptCompletionEventStatus.KILLED);
- task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
- // typically we are here because this map task was run on a bad node and
- // we want to reschedule it on a different node.
- // Depending on whether there are previous failed attempts or not this
- // can SCHEDULE or RESCHEDULE the container allocate request. If this
- // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
- // from the map splitInfo. So the bad node might be sent as a location
- // to the RM. But the RM would ignore that just like it would ignore
- // currently pending container requests affinitized to bad nodes.
- task.addAndScheduleAttempt();
- return TaskState.SCHEDULED;
- } else {
- // nothing to do
- return TaskState.SUCCEEDED;
- }
+ // successful attempt is now killed. reschedule
+ // tell the job about the rescheduling
+ unSucceed(task);
+ task.handleTaskAttemptCompletion(attemptId,
+ TaskAttemptCompletionEventStatus.KILLED);
+ task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
+ // typically we are here because this map task was run on a bad node and
+ // we want to reschedule it on a different node.
+ // Depending on whether there are previous failed attempts or not this
+ // can SCHEDULE or RESCHEDULE the container allocate request. If this
+ // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
+ // from the map splitInfo. So the bad node might be sent as a location
+ // to the RM. But the RM would ignore that just like it would ignore
+ // currently pending container requests affinitized to bad nodes.
+ task.addAndScheduleAttempt();
+ return TaskState.SCHEDULED;
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Oct 19 02:25:55 2012
@@ -84,6 +84,7 @@ public abstract class RMCommunicator ext
private Job job;
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
+ private volatile boolean shouldUnregister = true;
public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator");
@@ -213,7 +214,9 @@ public abstract class RMCommunicator ext
} catch (InterruptedException ie) {
LOG.warn("InterruptedException while stopping", ie);
}
- unregister();
+ if(shouldUnregister) {
+ unregister();
+ }
super.stop();
}
@@ -288,8 +291,15 @@ public abstract class RMCommunicator ext
protected abstract void heartbeat() throws Exception;
+ public void setShouldUnregister(boolean shouldUnregister) {
+ this.shouldUnregister = shouldUnregister;
+ LOG.info("RMCommunicator notified that shouldUnregistered is: "
+ + shouldUnregister);
+ }
+
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
- LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+ LOG.info("RMCommunicator notified that iSignalled is: "
+ + isSignalled);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Oct 19 02:25:55 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -131,6 +127,7 @@ public class RMContainerAllocator extend
private int containersReleased = 0;
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
+ private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
@@ -214,11 +211,18 @@ public class RMContainerAllocator extend
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
-
+
+ int completedMaps = getJob().getCompletedMaps();
+ int completedTasks = completedMaps + getJob().getCompletedReduces();
+ if (lastCompletedTasks != completedTasks) {
+ lastCompletedTasks = completedTasks;
+ recalculateReduceSchedule = true;
+ }
+
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
- getJob().getTotalMaps(), getJob().getCompletedMaps(),
+ getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java Fri Oct 19 02:25:55 2012
@@ -78,14 +78,29 @@ public class ConfBlock extends HtmlBlock
tr().
th(_TH, "key").
th(_TH, "value").
+ th(_TH, "source chain").
_().
_().
tbody();
for (ConfEntryInfo entry : info.getProperties()) {
+ StringBuffer buffer = new StringBuffer();
+ String[] sources = entry.getSource();
+ //Skip the last entry, because it is always the same HDFS file, and
+ // output them in reverse order so most recent is output first
+ boolean first = true;
+ for(int i = (sources.length - 2); i >= 0; i--) {
+ if(!first) {
+ // \u2B05 is an arrow <--
+ buffer.append(" \u2B05 ");
+ }
+ first = false;
+ buffer.append(sources[i]);
+ }
tbody.
tr().
td(entry.getName()).
td(entry.getValue()).
+ td(buffer.toString()).
_();
}
tbody._().
@@ -93,6 +108,7 @@ public class ConfBlock extends HtmlBlock
tr().
th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._().
th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._().
+ th().input("search_init").$type(InputType.text).$name("source chain").$value("source chain")._()._().
_().
_().
_();
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java Fri Oct 19 02:25:55 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.a
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
import static org.apache.hadoop.yarn.util.StringHelper.join;
-import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._EVEN;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._ODD;
@@ -31,6 +30,7 @@ import static org.apache.hadoop.yarn.web
import java.util.Date;
import java.util.List;
+import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -40,8 +40,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -106,7 +104,8 @@ public class JobBlock extends HtmlBlock
table.tr().
td(String.valueOf(attempt.getAttemptId())).
td(new Date(attempt.getStartTime()).toString()).
- td().a(".nodelink", url("http://", attempt.getNodeHttpAddress()),
+ td().a(".nodelink", url(HttpConfig.getSchemePrefix(),
+ attempt.getNodeHttpAddress()),
attempt.getNodeHttpAddress())._().
td().a(".logslink", url(attempt.getLogsLink()),
"logs")._().
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java Fri Oct 19 02:25:55 2012
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
+import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
@@ -62,7 +63,8 @@ public class NavBlock extends HtmlBlock
li().a(url("conf", jobid), "Configuration")._().
li().a(url("tasks", jobid, "m"), "Map tasks")._().
li().a(url("tasks", jobid, "r"), "Reduce tasks")._().
- li().a(".logslink", url("http://", nodeHttpAddress, "node",
+ li().a(".logslink", url(HttpConfig.getSchemePrefix(),
+ nodeHttpAddress, "node",
"containerlogs", thisAmInfo.getContainerId().toString(),
app.getJob().getUserName()),
"AM Logs")._()._();
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Oct 19 02:25:55 2012
@@ -27,6 +27,7 @@ import static org.apache.hadoop.yarn.web
import java.util.Collection;
+import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.util.StringUtils;
@@ -93,13 +94,15 @@ public class TaskPage extends AppView {
nodeTd._("N/A");
} else {
nodeTd.
- a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
+ a(".nodelink", url(HttpConfig.getSchemePrefix(),
+ nodeHttpAddr), nodeHttpAddr);
}
nodeTd._();
if (containerId != null) {
String containerIdStr = ta.getAssignedContainerIdStr();
row.td().
- a(".logslink", url("http://", nodeHttpAddr, "node", "containerlogs",
+ a(".logslink", url(HttpConfig.getSchemePrefix(),
+ nodeHttpAddr, "node", "containerlogs",
containerIdStr, app.getJob().getUserName()), "logs")._();
} else {
row.td()._("N/A")._();
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java Fri Oct 19 02:25:55 2012
@@ -24,6 +24,7 @@ import javax.xml.bind.annotation.XmlAcce
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -63,8 +64,8 @@ public class AMAttemptInfo {
ContainerId containerId = amInfo.getContainerId();
if (containerId != null) {
this.containerId = containerId.toString();
- this.logsLink = join("http://" + nodeHttpAddress,
- ujoin("node", "containerlogs", this.containerId));
+ this.logsLink = join(HttpConfig.getSchemePrefix() + nodeHttpAddress,
+ ujoin("node", "containerlogs", this.containerId, user));
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java Fri Oct 19 02:25:55 2012
@@ -27,13 +27,19 @@ public class ConfEntryInfo {
protected String name;
protected String value;
+ protected String[] source;
public ConfEntryInfo() {
}
public ConfEntryInfo(String key, String value) {
+ this(key, value, null);
+ }
+
+ public ConfEntryInfo(String key, String value, String[] source) {
this.name = key;
this.value = value;
+ this.source = source;
}
public String getName() {
@@ -43,4 +49,8 @@ public class ConfEntryInfo {
public String getValue() {
return this.value;
}
+
+ public String[] getSource() {
+ return source;
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java Fri Oct 19 02:25:55 2012
@@ -46,7 +46,8 @@ public class ConfInfo {
Configuration jobConf = job.loadConfFile();
this.path = job.getConfFile().toString();
for (Map.Entry<String, String> entry : jobConf) {
- this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue()));
+ this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(),
+ jobConf.getPropertySources(entry.getKey())));
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Oct 19 02:25:55 2012
@@ -330,7 +330,7 @@ public class TestJobHistoryEventHandler
Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
jheh.addToFileMap(jobId);
- jheh.setSignalled(true);
+ jheh.setForcejobCompletion(true);
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Oct 19 02:25:55 2012
@@ -603,7 +603,7 @@ public class MockJobs extends MockApps {
public Configuration loadConfFile() throws IOException {
FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
Configuration jobConf = new Configuration(false);
- jobConf.addResource(fc.open(configFile));
+ jobConf.addResource(fc.open(configFile), configFile.toString());
return jobConf;
}
};
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Oct 19 02:25:55 2012
@@ -180,7 +180,7 @@ public class TestMRApp {
@Test
public void testUpdatedNodes() throws Exception {
int runCount = 0;
- MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+ MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
// after half of the map completion, reduce will start
@@ -189,7 +189,7 @@ public class TestMRApp {
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
- Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
+ Assert.assertEquals("Num tasks not correct", 4, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
@@ -272,18 +272,19 @@ public class TestMRApp {
// rerun
// in rerun the 1st map will be recovered from previous run
- app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+ app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
- Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ Assert.assertEquals("No of tasks not correct", 4, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
- Task reduceTask = it.next();
+ Task reduceTask1 = it.next();
+ Task reduceTask2 = it.next();
// map 1 will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
@@ -306,19 +307,36 @@ public class TestMRApp {
Assert.assertEquals("Expecting 1 more completion events for success", 3,
events.length);
- app.waitForState(reduceTask, TaskState.RUNNING);
- TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator()
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+
+ TaskAttempt task3Attempt = reduceTask1.getAttempts().values().iterator()
.next();
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task3Attempt.getID(),
TaskAttemptEventType.TA_DONE));
- app.waitForState(reduceTask, TaskState.SUCCEEDED);
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+ app.getContext()
+ .getEventHandler()
+ .handle(
+ new TaskAttemptEvent(task3Attempt.getID(),
+ TaskAttemptEventType.TA_KILL));
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ TaskAttempt task4Attempt = reduceTask2.getAttempts().values().iterator()
+ .next();
+ app.getContext()
+ .getEventHandler()
+ .handle(
+ new TaskAttemptEvent(task4Attempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+ app.waitForState(reduceTask2, TaskState.SUCCEEDED);
events = job.getTaskAttemptCompletionEvents(0, 100);
- Assert.assertEquals("Expecting 1 more completion events for success", 4,
- events.length);
+ Assert.assertEquals("Expecting 2 more completion events for reduce success",
+ 5, events.length);
// job succeeds
app.waitForState(job, JobState.SUCCEEDED);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Oct 19 02:25:55 2012
@@ -93,7 +93,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Test;
@@ -139,7 +138,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -216,7 +215,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -282,7 +281,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -724,7 +723,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -828,7 +827,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator =
new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
@@ -994,7 +993,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -1099,8 +1098,7 @@ public class TestRMContainerAllocator {
super();
try {
Configuration conf = new Configuration();
- reinitialize(conf, new ContainerTokenSecretManager(conf),
- rmContext);
+ reinitialize(conf, rmContext);
} catch (IOException ie) {
LOG.info("add application failed with ", ie);
assert (false);
@@ -1409,7 +1407,63 @@ public class TestRMContainerAllocator {
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
+
+ private static class RecalculateContainerAllocator extends MyContainerAllocator {
+ public boolean recalculatedReduceSchedule = false;
+
+ public RecalculateContainerAllocator(MyResourceManager rm,
+ Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+ super(rm, conf, appAttemptId, job);
+ }
+
+ @Override
+ public void scheduleReduces(int totalMaps, int completedMaps,
+ int scheduledMaps, int scheduledReduces, int assignedMaps,
+ int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+ int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+ recalculatedReduceSchedule = true;
+ }
+ }
+ @Test
+ public void testCompletedTasksRecalculateSchedule() throws Exception {
+ LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+ Configuration conf = new Configuration();
+ final MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job job = mock(Job.class);
+ when(job.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ doReturn(10).when(job).getTotalMaps();
+ doReturn(10).when(job).getTotalReduces();
+ doReturn(0).when(job).getCompletedMaps();
+ RecalculateContainerAllocator allocator =
+ new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+ allocator.schedule();
+
+ allocator.recalculatedReduceSchedule = false;
+ allocator.schedule();
+ Assert.assertFalse("Unexpected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+
+ doReturn(1).when(job).getCompletedMaps();
+ allocator.schedule();
+ Assert.assertTrue("Expected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
@@ -1418,6 +1472,7 @@ public class TestRMContainerAllocator {
t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes();
+ t.testCompletedTasksRecalculateSchedule();
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBo
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import java.io.IOException;
@@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -89,28 +91,98 @@ import org.junit.Test;
handler.handle(new JobFinishEvent(jobid));
verify(fs).delete(stagingJobPath, true);
}
+
+ @Test
+ public void testDeletionofStagingOnKill() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(0);
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ JobId jobid = recordFactory.newRecordInstance(JobId.class);
+ jobid.setAppId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+ appMaster.init(conf);
+ //simulate the process being killed
+ MRAppMaster.MRAppMasterShutdownHook hook =
+ new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+ hook.run();
+ verify(fs, times(0)).delete(stagingJobPath, true);
+ }
+
+ @Test
+ public void testDeletionofStagingOnKillLastTry() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(1);
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ JobId jobid = recordFactory.newRecordInstance(JobId.class);
+ jobid.setAppId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+ appMaster.init(conf);
+ //simulate the process being killed
+ MRAppMaster.MRAppMasterShutdownHook hook =
+ new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+ hook.run();
+ verify(fs).delete(stagingJobPath, true);
+ }
private class TestMRApp extends MRAppMaster {
+ ContainerAllocator allocator;
- public TestMRApp(ApplicationAttemptId applicationAttemptId) {
- super(applicationAttemptId, BuilderUtils.newContainerId(
- applicationAttemptId, 1), "testhost", 2222, 3333, System
- .currentTimeMillis());
- }
-
- @Override
- protected FileSystem getFileSystem(Configuration conf) {
- return fs;
- }
-
- @Override
- protected void sysexit() {
- }
-
- @Override
- public Configuration getConfig() {
- return conf;
- }
+ public TestMRApp(ApplicationAttemptId applicationAttemptId,
+ ContainerAllocator allocator) {
+ super(applicationAttemptId, BuilderUtils.newContainerId(
+ applicationAttemptId, 1), "testhost", 2222, 3333, System
+ .currentTimeMillis());
+ this.allocator = allocator;
+ }
+
+ public TestMRApp(ApplicationAttemptId applicationAttemptId) {
+ this(applicationAttemptId, null);
+ }
+
+ @Override
+ protected FileSystem getFileSystem(Configuration conf) {
+ return fs;
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ final ClientService clientService, final AppContext context) {
+ if(allocator == null) {
+ return super.createContainerAllocator(clientService, context);
+ }
+ return allocator;
+ }
+
+ @Override
+ protected void sysexit() {
+ }
+
+ @Override
+ public Configuration getConfig() {
+ return conf;
+ }
+
+ @Override
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
+ }
}
private final class MRAppTestCleanup extends MRApp {
@@ -198,4 +270,4 @@ import org.junit.Test;
Assert.assertTrue("Staging directory not cleaned before notifying RM",
app.cleanedBeforeContainerAllocatorStopped);
}
- }
\ No newline at end of file
+ }
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Oct 19 02:25:55 2012
@@ -19,9 +19,11 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -43,11 +45,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Records;
@@ -91,8 +96,6 @@ public class TestJobImpl {
when(mockJob.getCommitter()).thenReturn(mockCommitter);
when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
when(mockJob.getJobContext()).thenReturn(mockJobContext);
- doNothing().when(mockJob).setFinishTime();
- doNothing().when(mockJob).logJobHistoryFinishedEvent();
when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED);
when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED);
when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED);
@@ -103,11 +106,13 @@ public class TestJobImpl {
// commitJob stubbed out, so this can't happen
}
doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+ JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob);
Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
- "for successful job",
- JobImpl.checkJobCompleteSuccess(mockJob));
+ "for successful job", jobState);
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
- JobState.FAILED, JobImpl.checkJobCompleteSuccess(mockJob));
+ JobState.FAILED, jobState);
+ verify(mockJob).abortJob(
+ eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
@Test
@@ -170,6 +175,8 @@ public class TestJobImpl {
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
t.testCheckAccess();
+ t.testReportDiagnostics();
+ t.testUberDecision();
}
@Test
@@ -239,6 +246,41 @@ public class TestJobImpl {
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
+
+ @Test
+ public void testReportDiagnostics() throws Exception {
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ final String diagMsg = "some diagnostic message";
+ final JobDiagnosticsUpdateEvent diagUpdateEvent =
+ new JobDiagnosticsUpdateEvent(jobId, diagMsg);
+ MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+ JobImpl job = new JobImpl(jobId, Records
+ .newRecord(ApplicationAttemptId.class), new Configuration(),
+ mock(EventHandler.class),
+ null, mock(JobTokenSecretManager.class), null,
+ new SystemClock(), null,
+ mrAppMetrics, mock(OutputCommitter.class),
+ true, null, 0, null, null);
+ job.handle(diagUpdateEvent);
+ String diagnostics = job.getReport().getDiagnostics();
+ Assert.assertNotNull(diagnostics);
+ Assert.assertTrue(diagnostics.contains(diagMsg));
+
+ job = new JobImpl(jobId, Records
+ .newRecord(ApplicationAttemptId.class), new Configuration(),
+ mock(EventHandler.class),
+ null, mock(JobTokenSecretManager.class), null,
+ new SystemClock(), null,
+ mrAppMetrics, mock(OutputCommitter.class),
+ true, null, 0, null, null);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+ job.handle(diagUpdateEvent);
+ diagnostics = job.getReport().getDiagnostics();
+ Assert.assertNotNull(diagnostics);
+ Assert.assertTrue(diagnostics.contains(diagMsg));
+ }
+
@Test
public void testUberDecision() throws Exception {