You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2014/01/03 08:27:11 UTC
svn commit: r1555021 [2/2] - in
/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Jan 3 07:26:52 2014
@@ -300,6 +300,8 @@ public class TypeConverter {
.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
.getJobFile(), trackingUrl, jobreport.isUber());
+ jobStatus.setStartTime(jobreport.getStartTime());
+ jobStatus.setFinishTime(jobreport.getFinishTime());
jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus;
}
@@ -441,6 +443,7 @@ public class TypeConverter {
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
+ jobStatus.setFinishTime(application.getFinishTime());
jobStatus.setFailureInfo(application.getDiagnostics());
jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskReport.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskReport.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskReport.java Fri Jan 3 07:26:52 2014
@@ -24,10 +24,10 @@ public interface TaskReport {
public abstract TaskId getTaskId();
public abstract TaskState getTaskState();
public abstract float getProgress();
+ public abstract String getStatus();
public abstract long getStartTime();
public abstract long getFinishTime();
public abstract Counters getCounters();
-
public abstract List<TaskAttemptId> getRunningAttemptsList();
public abstract TaskAttemptId getRunningAttempt(int index);
public abstract int getRunningAttemptsCount();
@@ -42,6 +42,7 @@ public interface TaskReport {
public abstract void setTaskId(TaskId taskId);
public abstract void setTaskState(TaskState taskState);
public abstract void setProgress(float progress);
+ public abstract void setStatus(String status);
public abstract void setStartTime(long startTime);
public abstract void setFinishTime(long finishTime);
public abstract void setCounters(Counters counters);
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskReportPBImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskReportPBImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskReportPBImpl.java Fri Jan 3 07:26:52 2014
@@ -49,6 +49,7 @@ public class TaskReportPBImpl extends Pr
private List<TaskAttemptId> runningAttempts = null;
private TaskAttemptId successfulAttemptId = null;
private List<String> diagnostics = null;
+ private String status;
public TaskReportPBImpl() {
@@ -172,10 +173,21 @@ public class TaskReportPBImpl extends Pr
}
@Override
+ public String getStatus() {
+ return status;
+ }
+
+ @Override
public void setProgress(float progress) {
maybeInitBuilder();
builder.setProgress((progress));
}
+
+ @Override
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
@Override
public TaskState getTaskState() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Fri Jan 3 07:26:52 2014
@@ -44,6 +44,8 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -82,12 +84,11 @@ public class TestMRWithDistributedCache
private static final Log LOG =
LogFactory.getLog(TestMRWithDistributedCache.class);
+
+ private static class DistributedCacheChecker {
- public static class DistributedCacheChecker extends
- Mapper<LongWritable, Text, NullWritable, NullWritable> {
-
- @Override
- public void setup(Context context) throws IOException {
+ public void setup(TaskInputOutputContext<?, ?, ?, ?> context)
+ throws IOException {
Configuration conf = context.getConfiguration();
Path[] localFiles = context.getLocalCacheFiles();
URI[] files = context.getCacheFiles();
@@ -101,6 +102,10 @@ public class TestMRWithDistributedCache
TestCase.assertEquals(2, files.length);
TestCase.assertEquals(2, archives.length);
+ // Check the file name
+ TestCase.assertTrue(files[0].getPath().endsWith("distributed.first"));
+ TestCase.assertTrue(files[1].getPath().endsWith("distributed.second.jar"));
+
// Check lengths of the files
TestCase.assertEquals(1, fs.getFileStatus(localFiles[0]).getLen());
TestCase.assertTrue(fs.getFileStatus(localFiles[1]).getLen() > 1);
@@ -130,8 +135,28 @@ public class TestMRWithDistributedCache
TestCase.assertTrue("second file should be symlinked too",
expectedAbsentSymlinkFile.exists());
}
+
}
-
+
+ public static class DistributedCacheCheckerMapper extends
+ Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ new DistributedCacheChecker().setup(context);
+ }
+ }
+
+ public static class DistributedCacheCheckerReducer extends
+ Reducer<LongWritable, Text, NullWritable, NullWritable> {
+
+ @Override
+ public void setup(Context context) throws IOException {
+ new DistributedCacheChecker().setup(context);
+ }
+ }
+
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
@@ -146,7 +171,8 @@ public class TestMRWithDistributedCache
Job job = Job.getInstance(conf);
- job.setMapperClass(DistributedCacheChecker.class);
+ job.setMapperClass(DistributedCacheCheckerMapper.class);
+ job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Fri Jan 3 07:26:52 2014
@@ -27,6 +27,8 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -71,6 +73,7 @@ public class TestTypeConverter {
@Test
public void testFromYarn() throws Exception {
int appStartTime = 612354;
+ int appFinishTime = 612355;
YarnApplicationState state = YarnApplicationState.RUNNING;
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationReport applicationReport = Records
@@ -78,6 +81,7 @@ public class TestTypeConverter {
applicationReport.setApplicationId(applicationId);
applicationReport.setYarnApplicationState(state);
applicationReport.setStartTime(appStartTime);
+ applicationReport.setFinishTime(appFinishTime);
applicationReport.setUser("TestTypeConverter-user");
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
@@ -91,6 +95,7 @@ public class TestTypeConverter {
applicationReport.setApplicationResourceUsageReport(appUsageRpt);
JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
+ Assert.assertEquals(appFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
}
@@ -172,4 +177,25 @@ public class TestTypeConverter {
Assert.assertEquals("QueueInfo children weren't properly converted",
returned.getQueueChildren().size(), 1);
}
+
+ @Test
+ public void testFromYarnJobReport() throws Exception {
+ int jobStartTime = 612354;
+ int jobFinishTime = 612355;
+ JobState state = JobState.RUNNING;
+ JobId jobId = Records.newRecord(JobId.class);
+ JobReport jobReport = Records.newRecord(JobReport.class);
+ ApplicationId applicationId = ApplicationId.newInstance(0, 0);
+ jobId.setAppId(applicationId);
+ jobId.setId(0);
+ jobReport.setJobId(jobId);
+ jobReport.setJobState(state);
+ jobReport.setStartTime(jobStartTime);
+ jobReport.setFinishTime(jobFinishTime);
+ jobReport.setUser("TestTypeConverter-user");
+ JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile");
+ Assert.assertEquals(jobStartTime, jobStatus.getStartTime());
+ Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime());
+ Assert.assertEquals(state.toString(), jobStatus.getState().toString());
+ }
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Fri Jan 3 07:26:52 2014
@@ -187,6 +187,7 @@ abstract public class Task implements Wr
protected SecretKey tokenSecret;
protected SecretKey shuffleSecret;
protected GcTimeUpdater gcUpdater;
+ final AtomicBoolean mustPreempt = new AtomicBoolean(false);
////////////////////////////////////////////
// Constructors
@@ -711,6 +712,7 @@ abstract public class Task implements Wr
}
try {
boolean taskFound = true; // whether TT knows about this task
+ AMFeedback amFeedback = null;
// sleep for a bit
synchronized(lock) {
if (taskDone.get()) {
@@ -728,12 +730,14 @@ abstract public class Task implements Wr
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
counters);
- taskFound = umbilical.statusUpdate(taskId, taskStatus);
+ amFeedback = umbilical.statusUpdate(taskId, taskStatus);
+ taskFound = amFeedback.getTaskFound();
taskStatus.clearStatus();
}
else {
// send ping
- taskFound = umbilical.ping(taskId);
+ amFeedback = umbilical.statusUpdate(taskId, null);
+ taskFound = amFeedback.getTaskFound();
}
// if Task Tracker is not aware of our task ID (probably because it died and
@@ -744,6 +748,17 @@ abstract public class Task implements Wr
System.exit(66);
}
+ // Set a flag that says we should preempt this is read by
+ // ReduceTasks in places of the execution where it is
+ // safe/easy to preempt
+ boolean lastPreempt = mustPreempt.get();
+ mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption());
+
+ if (lastPreempt ^ mustPreempt.get()) {
+ LOG.info("PREEMPTION TASK: setting mustPreempt to " +
+ mustPreempt.get() + " given " + amFeedback.getPreemption() +
+ " for "+ taskId + " task status: " +taskStatus.getPhase());
+ }
sendProgress = resetProgressFlag();
remainingRetries = MAX_RETRIES;
}
@@ -992,10 +1007,17 @@ abstract public class Task implements Wr
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
- LOG.info("Task:" + taskId + " is done."
- + " And is in the process of committing");
updateCounters();
-
+ if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) {
+ // If we are preempted, do no output promotion; signal done and exit
+ committer.commitTask(taskContext);
+ umbilical.preempted(taskId, taskStatus);
+ taskDone.set(true);
+ reporter.stopCommunicationThread();
+ return;
+ }
+ LOG.info("Task:" + taskId + " is done."
+ + " And is in the process of committing");
boolean commitRequired = isCommitRequired();
if (commitRequired) {
int retries = MAX_RETRIES;
@@ -1054,7 +1076,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
}
@@ -1098,8 +1120,8 @@ abstract public class Task implements Wr
if (isMapTask() && conf.getNumReduceTasks() > 0) {
try {
Path mapOutput = mapOutputFile.getOutputFile();
- FileSystem localFS = FileSystem.getLocal(conf);
- return localFS.getFileStatus(mapOutput).getLen();
+ FileSystem fs = mapOutput.getFileSystem(conf);
+ return fs.getFileStatus(mapOutput).getLen();
} catch (IOException e) {
LOG.warn ("Could not find output size " , e);
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Fri Jan 3 07:26:52 2014
@@ -51,7 +51,7 @@ public abstract class TaskStatus impleme
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
+ COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED}
private final TaskAttemptID taskid;
private float progress;
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Jan 3 07:26:52 2014
@@ -24,6 +24,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
@@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol e
* Version 17 Modified TaskID to be aware of the new TaskTypes
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* Version 19 Added fatalError for child to communicate fatal errors to TT
+ * Version 20 Added methods to manage checkpoints
* */
- public static final long versionID = 19L;
+ public static final long versionID = 20L;
/**
* Called when a child task process starts, to get its task.
@@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol e
JvmTask getTask(JvmContext context) throws IOException;
/**
- * Report child's progress to parent.
+ * Report child's progress to parent. Also invoked to report still alive (used
+ * to be in ping). It reports an AMFeedback used to propagate preemption requests.
*
* @param taskId task-id of the child
* @param taskStatus status of the child
@@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol e
* @throws InterruptedException
* @return True if the task is known
*/
- boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
/** Report error messages back to parent. Calls should be sparing, since all
@@ -105,11 +110,6 @@ public interface TaskUmbilicalProtocol e
void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
throws IOException;
- /** Periodically called by child to check if parent is still alive.
- * @return True if the task is known
- */
- boolean ping(TaskAttemptID taskid) throws IOException;
-
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this.
* @param taskid task's id
@@ -161,4 +161,33 @@ public interface TaskUmbilicalProtocol e
TaskAttemptID id)
throws IOException;
+ /**
+ * Report to the AM that the task has been succesfully preempted.
+ *
+ * @param taskId task's id
+ * @param taskStatus status of the child
+ * @throws IOException
+ */
+ void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ /**
+ * Return the latest CheckpointID for the given TaskID. This provides
+ * the task with a way to locate the checkpointed data and restart from
+ * that point in the computation.
+ *
+ * @param taskID task's id
+ * @return the most recent checkpoint (if any) for this task
+ * @throws IOException
+ */
+ TaskCheckpointID getCheckpointID(TaskID taskID);
+
+ /**
+ * Send a CheckpointID for a given TaskID to be stored in the AM,
+ * to later restart a task from this checkpoint.
+ * @param tid
+ * @param cid
+ */
+ void setCheckpointID(TaskID tid, TaskCheckpointID cid);
+
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java Fri Jan 3 07:26:52 2014
@@ -45,5 +45,9 @@ public enum JobCounter {
TOTAL_LAUNCHED_UBERTASKS,
NUM_UBER_SUBMAPS,
NUM_UBER_SUBREDUCES,
- NUM_FAILED_UBERTASKS
+ NUM_FAILED_UBERTASKS,
+ TASKS_REQ_PREEMPT,
+ CHECKPOINTS,
+ CHECKPOINT_BYTES,
+ CHECKPOINT_TIME
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Jan 3 07:26:52 2014
@@ -459,7 +459,13 @@ public interface MRJobConfig {
public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT =
MR_AM_PREFIX + "job.reduce.preemption.limit";
public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
-
+
+ /**
+ * Policy class encoding responses to preemption requests.
+ */
+ public static final String MR_AM_PREEMPTION_POLICY =
+ MR_AM_PREFIX + "preemption.policy";
+
/** AM ACL disabled. **/
public static final String JOB_AM_ACCESS_DISABLED =
"mapreduce.job.am-access-disabled";
@@ -708,4 +714,7 @@ public interface MRJobConfig {
public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
+ public static final String TASK_PREEMPTION =
+ "mapreduce.job.preemption";
+
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Fri Jan 3 07:26:52 2014
@@ -137,7 +137,7 @@ public class WrappedReducer<KEYIN, VALUE
@Override
public URI[] getCacheFiles() throws IOException {
- return reduceContext.getCacheArchives();
+ return reduceContext.getCacheFiles();
}
@Override
Propchange: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1513717-1550362
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1550130-1555020
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Fri Jan 3 07:26:52 2014
@@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name= Total
SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms)
FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms)
FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms)
+TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt
+CHECKPOINTS.name= Number of checkpoints reported
+CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints
+CHECKPOINT_TIME.name= Total time spent checkpointing (ms)
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Fri Jan 3 07:26:52 2014
@@ -88,6 +88,8 @@ import org.apache.hadoop.yarn.util.Recor
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This module is responsible for talking to the
* JobClient (user facing).
@@ -142,7 +144,8 @@ public class HistoryClientService extend
super.serviceStart();
}
- private void initializeWebApp(Configuration conf) {
+ @VisibleForTesting
+ protected void initializeWebApp(Configuration conf) {
webApp = new HsWebApp(history);
InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
// NOTE: there should be a .at(InetSocketAddress)
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Fri Jan 3 07:26:52 2014
@@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
+import com.google.common.annotations.VisibleForTesting;
+
/******************************************************************
* {@link JobHistoryServer} is responsible for servicing all job history
* related requests from client.
@@ -60,10 +62,10 @@ public class JobHistoryServer extends Co
public static final long historyServerTimeStamp = System.currentTimeMillis();
private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
- private HistoryContext historyContext;
+ protected HistoryContext historyContext;
private HistoryClientService clientService;
private JobHistory jobHistoryService;
- private JHSDelegationTokenSecretManager jhsDTSecretManager;
+ protected JHSDelegationTokenSecretManager jhsDTSecretManager;
private AggregatedLogDeletionService aggLogDelService;
private HSAdminServer hsAdminServer;
private HistoryServerStateStoreService stateStore;
@@ -129,8 +131,7 @@ public class JobHistoryServer extends Co
historyContext = (HistoryContext)jobHistoryService;
stateStore = createStateStore(conf);
this.jhsDTSecretManager = createJHSSecretManager(conf, stateStore);
- clientService = new HistoryClientService(historyContext,
- this.jhsDTSecretManager);
+ clientService = createHistoryClientService();
aggLogDelService = new AggregatedLogDeletionService();
hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
addService(stateStore);
@@ -142,6 +143,12 @@ public class JobHistoryServer extends Co
super.serviceInit(config);
}
+ @VisibleForTesting
+ protected HistoryClientService createHistoryClientService() {
+ return new HistoryClientService(historyContext,
+ this.jhsDTSecretManager);
+ }
+
protected JHSDelegationTokenSecretManager createJHSSecretManager(
Configuration conf, HistoryServerStateStoreService store) {
long secretKeyInterval =
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java Fri Jan 3 07:26:52 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
+import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.yarn.util.Times;
@@ -89,6 +90,7 @@ public class HsTaskPage extends HsView {
headRow.
th(".id", "Attempt").
th(".state", "State").
+ th(".status", "Status").
th(".node", "Node").
th(".logs", "Logs").
th(".tsh", "Start Time");
@@ -113,15 +115,16 @@ public class HsTaskPage extends HsView {
// DataTables to display
StringBuilder attemptsTableData = new StringBuilder("[\n");
- for (TaskAttempt ta : getTaskAttempts()) {
- String taid = MRApps.toString(ta.getID());
+ for (TaskAttempt attempt : getTaskAttempts()) {
+ final TaskAttemptInfo ta = new TaskAttemptInfo(attempt, false);
+ String taid = ta.getId();
+
+ String nodeHttpAddr = ta.getNode();
+ String containerIdString = ta.getAssignedContainerIdStr();
+ String nodeIdString = attempt.getAssignedContainerMgrAddress();
+ String nodeRackName = ta.getRack();
- String nodeHttpAddr = ta.getNodeHttpAddress();
- String containerIdString = ta.getAssignedContainerID().toString();
- String nodeIdString = ta.getAssignedContainerMgrAddress();
- String nodeRackName = ta.getNodeRackName();
-
- long attemptStartTime = ta.getLaunchTime();
+ long attemptStartTime = ta.getStartTime();
long shuffleFinishTime = -1;
long sortFinishTime = -1;
long attemptFinishTime = ta.getFinishTime();
@@ -129,8 +132,8 @@ public class HsTaskPage extends HsView {
long elapsedSortTime = -1;
long elapsedReduceTime = -1;
if(type == TaskType.REDUCE) {
- shuffleFinishTime = ta.getShuffleFinishTime();
- sortFinishTime = ta.getSortFinishTime();
+ shuffleFinishTime = attempt.getShuffleFinishTime();
+ sortFinishTime = attempt.getSortFinishTime();
elapsedShuffleTime =
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
elapsedSortTime =
@@ -140,11 +143,13 @@ public class HsTaskPage extends HsView {
}
long attemptElapsed =
Times.elapsed(attemptStartTime, attemptFinishTime, false);
- int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000);
+ int sortId = attempt.getID().getId()
+ + (attempt.getID().getTaskId().getId() * 10000);
attemptsTableData.append("[\"")
.append(sortId + " ").append(taid).append("\",\"")
- .append(ta.getState().toString()).append("\",\"")
+ .append(ta.getState()).append("\",\"")
+ .append(ta.getStatus()).append("\",\"")
.append("<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>")
.append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"")
@@ -167,8 +172,9 @@ public class HsTaskPage extends HsView {
.append(elapsedReduceTime).append("\",\"");
}
attemptsTableData.append(attemptElapsed).append("\",\"")
- .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
- Joiner.on('\n').join(ta.getDiagnostics())))).append("\"],\n");
+ .append(StringEscapeUtils.escapeJavaScript(
+ StringEscapeUtils.escapeHtml(ta.getNote())))
+ .append("\"],\n");
}
//Remove the last comma and close off the array of arrays
if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') {
@@ -185,6 +191,8 @@ public class HsTaskPage extends HsView {
th().input("search_init").$type(InputType.text).
$name("attempt_state").$value("State")._()._().
th().input("search_init").$type(InputType.text).
+ $name("attempt_status").$value("Status")._()._().
+ th().input("search_init").$type(InputType.text).
$name("attempt_node").$value("Node")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_node").$value("Logs")._()._().
@@ -283,19 +291,19 @@ public class HsTaskPage extends HsView {
.append("\n,aoColumnDefs:[\n")
//logs column should not filterable (it includes container ID which may pollute searches)
- .append("\n{'aTargets': [ 3 ]")
+ .append("\n{'aTargets': [ 4 ]")
.append(", 'bSearchable': false }")
.append("\n, {'sType':'numeric', 'aTargets': [ 0 ]")
.append(", 'mRender': parseHadoopAttemptID }")
- .append("\n, {'sType':'numeric', 'aTargets': [ 4, 5")
+ .append("\n, {'sType':'numeric', 'aTargets': [ 5, 6")
//Column numbers are different for maps and reduces
- .append(type == TaskType.REDUCE ? ", 6, 7" : "")
+ .append(type == TaskType.REDUCE ? ", 7, 8" : "")
.append(" ], 'mRender': renderHadoopDate }")
.append("\n, {'sType':'numeric', 'aTargets': [")
- .append(type == TaskType.REDUCE ? "8, 9, 10, 11" : "6")
+ .append(type == TaskType.REDUCE ? "9, 10, 11, 12" : "7")
.append(" ], 'mRender': renderHadoopElapsedTime }]")
// Sort by id upon page load
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Fri Jan 3 07:26:52 2014
@@ -339,8 +339,11 @@ public class TestJobHistoryParsing {
PrintStream stdps = System.out;
try {
System.setOut(new PrintStream(outContent));
- HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
- fileInfo.getHistoryFile()).toString(), conf, true);
+ HistoryViewer viewer;
+ synchronized (fileInfo) {
+ viewer = new HistoryViewer(fc.makeQualified(
+ fileInfo.getHistoryFile()).toString(), conf, true);
+ }
viewer.print();
for (TaskInfo taskInfo : allTasks.values()) {
@@ -397,29 +400,27 @@ public class TestJobHistoryParsing {
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
- String jobhistoryDir = JobHistoryUtils
- .getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
+ HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+
+ JobHistoryParser parser;
+ JobInfo jobInfo;
+ synchronized (fileInfo) {
+ Path historyFilePath = fileInfo.getHistoryFile();
+ FSDataInputStream in = null;
+ FileContext fc = null;
+ try {
+ fc = FileContext.getFileContext(conf);
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
- JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- FileContext fc = null;
- try {
- fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
- } catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
+ parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
}
-
- JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
@@ -464,29 +465,28 @@ public class TestJobHistoryParsing {
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
- String jobhistoryDir = JobHistoryUtils
- .getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
- JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- FileContext fc = null;
- try {
- fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
- } catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
- }
+ HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+
+ JobHistoryParser parser;
+ JobInfo jobInfo;
+ synchronized (fileInfo) {
+ Path historyFilePath = fileInfo.getHistoryFile();
+ FSDataInputStream in = null;
+ FileContext fc = null;
+ try {
+ fc = FileContext.getFileContext(conf);
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
- JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
+ parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
+ }
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestBlocks.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestBlocks.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestBlocks.java Fri Jan 3 07:26:52 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
@@ -138,11 +139,31 @@ public class TestBlocks {
when(attempt.getAssignedContainerMgrAddress()).thenReturn(
"assignedContainerMgrAddress");
when(attempt.getNodeRackName()).thenReturn("nodeRackName");
- when(attempt.getLaunchTime()).thenReturn(100002L);
- when(attempt.getFinishTime()).thenReturn(100012L);
- when(attempt.getShuffleFinishTime()).thenReturn(100010L);
- when(attempt.getSortFinishTime()).thenReturn(100011L);
- when(attempt.getState()).thenReturn(TaskAttemptState.SUCCEEDED);
+
+ final long taStartTime = 100002L;
+ final long taFinishTime = 100012L;
+ final long taShuffleFinishTime = 100010L;
+ final long taSortFinishTime = 100011L;
+ final TaskAttemptState taState = TaskAttemptState.SUCCEEDED;
+
+ when(attempt.getLaunchTime()).thenReturn(taStartTime);
+ when(attempt.getFinishTime()).thenReturn(taFinishTime);
+ when(attempt.getShuffleFinishTime()).thenReturn(taShuffleFinishTime);
+ when(attempt.getSortFinishTime()).thenReturn(taSortFinishTime);
+ when(attempt.getState()).thenReturn(taState);
+
+ TaskAttemptReport taReport = mock(TaskAttemptReport.class);
+ when(taReport.getStartTime()).thenReturn(taStartTime);
+ when(taReport.getFinishTime()).thenReturn(taFinishTime);
+ when(taReport.getShuffleFinishTime()).thenReturn(taShuffleFinishTime);
+ when(taReport.getSortFinishTime()).thenReturn(taSortFinishTime);
+ when(taReport.getContainerId()).thenReturn(containerId);
+ when(taReport.getProgress()).thenReturn(1.0f);
+ when(taReport.getStateString()).thenReturn("Processed 128/128 records");
+ when(taReport.getTaskAttemptState()).thenReturn(taState);
+ when(taReport.getDiagnosticInfo()).thenReturn("");
+
+ when(attempt.getReport()).thenReturn(taReport);
attempts.put(taId, attempt);
when(task.getAttempts()).thenReturn(attempts);
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java Fri Jan 3 07:26:52 2014
@@ -444,9 +444,9 @@ public class TestHsWebServicesAttempts e
public void verifyHsTaskAttempt(JSONObject info, TaskAttempt att,
TaskType ttype) throws JSONException {
if (ttype == TaskType.REDUCE) {
- assertEquals("incorrect number of elements", 16, info.length());
+ assertEquals("incorrect number of elements", 17, info.length());
} else {
- assertEquals("incorrect number of elements", 11, info.length());
+ assertEquals("incorrect number of elements", 12, info.length());
}
verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
@@ -551,11 +551,11 @@ public class TestHsWebServicesAttempts e
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
mergeFinishTime);
assertEquals("elapsedShuffleTime wrong",
- ta.getLaunchTime() - ta.getShuffleFinishTime(), elapsedShuffleTime);
+ ta.getShuffleFinishTime() - ta.getLaunchTime(), elapsedShuffleTime);
assertEquals("elapsedMergeTime wrong",
- ta.getShuffleFinishTime() - ta.getSortFinishTime(), elapsedMergeTime);
+ ta.getSortFinishTime() - ta.getShuffleFinishTime(), elapsedMergeTime);
assertEquals("elapsedReduceTime wrong",
- ta.getSortFinishTime() - ta.getFinishTime(), elapsedReduceTime);
+ ta.getFinishTime() - ta.getSortFinishTime(), elapsedReduceTime);
}
@Test
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java Fri Jan 3 07:26:52 2014
@@ -538,7 +538,7 @@ public class TestHsWebServicesTasks exte
public void verifyHsSingleTask(JSONObject info, Task task)
throws JSONException {
- assertEquals("incorrect number of elements", 8, info.length());
+ assertEquals("incorrect number of elements", 9, info.length());
verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
info.getString("type"), info.getString("successfulAttempt"),
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Fri Jan 3 07:26:52 2014
@@ -304,7 +304,7 @@ public class TestClientRedirect {
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws IOException {
- return recordFactory.newRecordInstance(KillApplicationResponse.class);
+ return KillApplicationResponse.newInstance(true);
}
@Override
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java Fri Jan 3 07:26:52 2014
@@ -195,8 +195,7 @@ public class TestJobCleanup {
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
job.waitForCompletion();
- Counters counters = job.getCounters();
- assertTrue("No. of failed maps should be 1",counters.getCounter(JobCounter.NUM_FAILED_MAPS) == 1);
+ assertEquals("Job did not fail", JobStatus.FAILED, job.getJobState());
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
@@ -242,9 +241,7 @@ public class TestJobCleanup {
job.killJob(); // kill the job
job.waitForCompletion(); // wait for the job to complete
-
- counters = job.getCounters();
- assertTrue("No. of killed maps should be 1", counters.getCounter(JobCounter.NUM_KILLED_MAPS) == 1);
+ assertEquals("Job was not killed", JobStatus.KILLED, job.getJobState());
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java Fri Jan 3 07:26:52 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
@@ -29,20 +28,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.split.JobSplitWriter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -110,11 +106,16 @@ public class TestMapProgress extends Tes
statusUpdate(taskId, taskStatus);
}
+ public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ statusUpdate(taskId, taskStatus);
+ }
+
public boolean canCommit(TaskAttemptID taskid) throws IOException {
return true;
}
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
@@ -128,7 +129,9 @@ public class TestMapProgress extends Tes
LOG.info(buf.toString());
// ignore phase
// ignore counters
- return true;
+ AMFeedback a = new AMFeedback();
+ a.setTaskFound(true);
+ return a;
}
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
@@ -145,6 +148,17 @@ public class TestMapProgress extends Tes
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
+
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ // do nothing
+ return null;
+ }
+
+ @Override
+ public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+ // do nothing
+ }
}
private FileSystem fs = null;
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java Fri Jan 3 07:26:52 2014
@@ -27,6 +27,10 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+
public class TestTaskCommit extends HadoopTestCase {
Path rootDir =
@@ -132,11 +136,6 @@ public class TestTaskCommit extends Hado
}
@Override
- public boolean ping(TaskAttemptID taskid) throws IOException {
- return true;
- }
-
- @Override
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
throws IOException {
}
@@ -152,9 +151,11 @@ public class TestTaskCommit extends Hado
}
@Override
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
- return true;
+ AMFeedback a = new AMFeedback();
+ a.setTaskFound(true);
+ return a;
}
@Override
@@ -168,6 +169,22 @@ public class TestTaskCommit extends Hado
long clientVersion, int clientMethodsHash) throws IOException {
return null;
}
+
+ @Override
+ public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ fail("Task should not go to commit-pending");
+ }
+
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ return null;
+ }
+
+ @Override
+ public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+ // ignore
+ }
}
/**
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Fri Jan 3 07:26:52 2014
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -208,7 +209,7 @@ public class TestYARNRunner extends Test
};
/* make sure kill calls finish application master */
when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
- .thenReturn(null);
+ .thenReturn(KillApplicationResponse.newInstance(true));
delegate.killApplication(appId);
verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Fri Jan 3 07:26:52 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryClientService;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService;
import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
@@ -94,6 +95,17 @@ public class TestJHSSecurity {
return new JHSDelegationTokenSecretManager(initialInterval,
maxLifetime, renewInterval, 3600000, store);
}
+
+ @Override
+ protected HistoryClientService createHistoryClientService() {
+ return new HistoryClientService(historyContext,
+ this.jhsDTSecretManager) {
+ @Override
+ protected void initializeWebApp(Configuration conf) {
+ // Don't need it, skip.;
+ }
+ };
+ }
};
// final JobHistoryServer jobHistoryServer = jhServer;
jobHistoryServer.init(conf);
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Fri Jan 3 07:26:52 2014
@@ -115,7 +115,7 @@ public class TestUmbilicalProtocolWithJo
proxy = (TaskUmbilicalProtocol) RPC.getProxy(
TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,
addr, conf);
- proxy.ping(null);
+ proxy.statusUpdate(null, null);
} finally {
server.stop();
if (proxy != null) {
Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Fri Jan 3 07:26:52 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2;
+import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
@@ -106,17 +107,21 @@ public class TestSpeculativeExecutionWit
int maxTimeWait = 10;
boolean successfullySpeculated = false;
+ TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) {
if (taskToBeSpeculated.getAttempts().size() != 2) {
Thread.sleep(1000);
clock.setTime(System.currentTimeMillis() + 20000);
} else {
successfullySpeculated = true;
+ // finish 1st TA, 2nd will be killed
+ ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
}
maxTimeWait--;
}
Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated);
+ verifySpeculationMessage(app, ta);
}
@Test(timeout = 60000)
@@ -197,16 +202,47 @@ public class TestSpeculativeExecutionWit
int maxTimeWait = 5;
boolean successfullySpeculated = false;
+ TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) {
if (speculatedTask.getAttempts().size() != 2) {
Thread.sleep(1000);
} else {
successfullySpeculated = true;
+ ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
}
maxTimeWait--;
}
Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated);
+ verifySpeculationMessage(app, ta);
+ }
+
+ private static TaskAttempt[] makeFirstAttemptWin(
+ EventHandler appEventHandler, Task speculatedTask) {
+
+ // finish 1st TA, 2nd will be killed
+ Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
+ TaskAttempt[] ta = new TaskAttempt[attempts.size()];
+ attempts.toArray(ta);
+ appEventHandler.handle(
+ new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
+ appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ return ta;
+ }
+
+ private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
+ throws Exception {
+ app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
+ app.waitForState(ta[1], TaskAttemptState.KILLED);
+ boolean foundSpecMsg = false;
+ for (String msg : ta[1].getDiagnostics()) {
+ if (msg.contains("Speculation")) {
+ foundSpecMsg = true;
+ break;
+ }
+ }
+ Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
}
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,