You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/19 14:09:26 UTC
svn commit: r697068 - in /hadoop/core/trunk: ./
src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/mapred/o...
Author: ddas
Date: Fri Sep 19 05:09:23 2008
New Revision: 697068
URL: http://svn.apache.org/viewvc?rev=697068&view=rev
Log:
HADOOP-3924. Adds a KILLED job status. Contributed by Subramaniam Krishnan.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/core/trunk/src/webapps/job/jobdetails.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 05:09:23 2008
@@ -188,6 +188,8 @@
HADOOP-4070. Provide a mechanism in Hive for registering UDFs from the
query language. (tomwhite)
+ HADOOP-3924. Adds a KILLED job status (Subramaniam Krishnan via ddas)
+
IMPROVEMENTS
HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).
Modified: hadoop/core/trunk/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java (original)
+++ hadoop/core/trunk/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java Fri Sep 19 05:09:23 2008
@@ -41,7 +41,8 @@
*/
public enum JobState {
PREPARE(JobStatus.PREP), RUNNING(JobStatus.RUNNING), FAILED(
- JobStatus.FAILED), SUCCEEDED(JobStatus.SUCCEEDED);
+ JobStatus.FAILED), SUCCEEDED(JobStatus.SUCCEEDED), KILLED(
+ JobStatus.KILLED);
final int state;
@@ -59,6 +60,8 @@
return FAILED;
case JobStatus.SUCCEEDED:
return SUCCEEDED;
+ case JobStatus.KILLED:
+ return KILLED;
default:
return null;
}
@@ -200,6 +203,8 @@
if (this.completed) {
if (this.successful) {
return JobState.SUCCEEDED;
+ } else if (this.killed) {
+ return JobState.KILLED;
} else {
return JobState.FAILED;
}
@@ -229,6 +234,13 @@
public boolean isCompleted() {
return this.completed;
}
+
+ /**
+ * @return
+ */
+ public boolean isKilled() {
+ return this.killed;
+ }
/**
* @return
@@ -281,6 +293,7 @@
this.counters = running.getCounters();
this.completed = running.isComplete();
this.successful = running.isSuccessful();
+ this.killed = running.isKilled();
this.mapProgress = running.mapProgress();
this.reduceProgress = running.reduceProgress();
// running.getTaskCompletionEvents(fromEvent);
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Sep 19 05:09:23 2008
@@ -34,6 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -328,7 +329,8 @@
List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
for (JobInProgress job: infos.keySet()) {
int runState = job.getStatus().getRunState();
- if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED) {
+ if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED) {
toRemove.add(job);
}
}
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Sep 19 05:09:23 2008
@@ -31,6 +31,7 @@
import junit.framework.TestCase;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.FairScheduler.JobInfo;
public class TestFairScheduler extends TestCase {
@@ -334,6 +335,7 @@
submitJobs(1, JobStatus.PREP, 10, 10);
submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
submitJobs(1, JobStatus.FAILED, 10, 10);
+ submitJobs(1, JobStatus.KILLED, 10, 10);
assertNull(scheduler.assignTasks(tracker("tt1")));
advanceTime(100); // Check that we still don't assign jobs after an update
assertNull(scheduler.assignTasks(tracker("tt1")));
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Fri Sep 19 05:09:23 2008
@@ -58,7 +58,7 @@
LOG.error("Job initialization failed:\n" +
StringUtils.stringifyException(t));
if (job != null) {
- job.kill();
+ job.fail();
}
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Sep 19 05:09:23 2008
@@ -62,7 +62,6 @@
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -268,7 +267,8 @@
public synchronized boolean isComplete() throws IOException {
updateStatus();
return (status.getRunState() == JobStatus.SUCCEEDED ||
- status.getRunState() == JobStatus.FAILED);
+ status.getRunState() == JobStatus.FAILED ||
+ status.getRunState() == JobStatus.KILLED);
}
/**
@@ -292,6 +292,14 @@
}
/**
+ * Tells the service to get the state of the current job.
+ */
+ public synchronized int getJobState() throws IOException {
+ updateStatus();
+ return status.getRunState();
+ }
+
+ /**
* Tells the service to terminate the current job.
*/
public synchronized void killJob() throws IOException {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Sep 19 05:09:23 2008
@@ -920,6 +920,37 @@
}
}
/**
+ * Logs job killed event. Closes the job history log file.
+ *
+ * @param jobid
+ * job id
+ * @param timestamp
+ * time when job killed was issued in ms.
+ * @param finishedMaps
+ * no finished map tasks.
+ * @param finishedReduces
+ * no of finished reduce tasks.
+ */
+ public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
+ int finishedReduces) {
+ if (!disableHistory) {
+ String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
+ ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+
+ if (null != writer) {
+ JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
+ Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
+ Keys.FINISHED_REDUCES }, new String[] { jobid.toString(),
+ String.valueOf(timestamp), Values.KILLED.name(),
+ String.valueOf(finishedMaps), String.valueOf(finishedReduces) });
+ for (PrintWriter out : writer) {
+ out.close();
+ }
+ openJobs.remove(logFileKey);
+ }
+ }
+ }
+ /**
* Log job's priority.
* @param jobid job id
* @param priority Jobs priority
@@ -936,7 +967,6 @@
}
}
}
-
/**
* Log job's submit-time/launch-time
* @param jobid job id
@@ -960,6 +990,7 @@
}
}
}
+
/**
* Helper class for logging or reading back events related to Task's start, finish or failure.
* All events logged by this class are logged in a separate file per job in
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Sep 19 05:09:23 2008
@@ -31,6 +31,7 @@
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -83,6 +84,7 @@
int failedReduceTIPs = 0;
private volatile boolean launchedCleanup = false;
private volatile boolean jobKilled = false;
+ private volatile boolean jobFailed = false;
JobPriority priority = JobPriority.NORMAL;
JobTracker jobtracker = null;
@@ -868,7 +870,7 @@
return false;
}
// check if job has failed or killed
- if (jobKilled) {
+ if (jobKilled || jobFailed) {
return true;
}
// Check if all maps and reducers have finished.
@@ -1697,10 +1699,13 @@
}
//
// The Job is done
- //
- // if the job is killed, then mark the job failed.
+ // if the job is failed, then mark the job failed.
+ if (jobFailed) {
+ terminateJob(JobStatus.FAILED);
+ }
+ // if the job is killed, then mark the job killed.
if (jobKilled) {
- killJob();
+ terminateJob(JobStatus.KILLED);
}
else {
jobComplete(metrics);
@@ -1742,24 +1747,31 @@
}
}
- private synchronized void killJob() {
- if ((status.getRunState() == JobStatus.RUNNING) ||
- (status.getRunState() == JobStatus.PREP)) {
- this.status = new JobStatus(status.getJobID(),
- 1.0f, 1.0f, 1.0f, JobStatus.FAILED);
+ private synchronized void terminateJob(int jobState) {
+ if ((status.getRunState() == JobStatus.RUNNING)
+ || (status.getRunState() == JobStatus.PREP)) {
this.finishTime = System.currentTimeMillis();
- JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
- this.finishedMapTasks, this.finishedReduceTasks);
+ if (jobState == JobStatus.FAILED) {
+ this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f,
+ JobStatus.FAILED);
+ JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
+ this.finishedMapTasks, this.finishedReduceTasks);
+ } else if (jobState == JobStatus.KILLED) {
+ this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f,
+ JobStatus.KILLED);
+ JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime,
+ this.finishedMapTasks, this.finishedReduceTasks);
+ }
garbageCollect();
}
}
-
+
/**
* Kill the job and all its component tasks.
*/
- public synchronized void kill() {
- if ((status.getRunState() == JobStatus.RUNNING) ||
- (status.getRunState() == JobStatus.PREP)) {
+ private synchronized void terminate(int jobState) {
+ if ((status.getRunState() == JobStatus.RUNNING)
+ || (status.getRunState() == JobStatus.PREP)) {
LOG.info("Killing job '" + this.status.getJobID() + "'");
this.runningMapTasks = 0;
this.runningReduceTasks = 0;
@@ -1772,11 +1784,29 @@
for (int i = 0; i < reduces.length; i++) {
reduces[i].kill();
}
- jobKilled = true;
+ if (jobState == JobStatus.FAILED) {
+ jobFailed = true;
+ } else if (jobState == JobStatus.KILLED) {
+ jobKilled = true;
+ }
}
}
/**
+ * Kill the job and all its component tasks.
+ */
+ public synchronized void kill() {
+ terminate(JobStatus.KILLED);
+ }
+
+ /**
+ * Fails the job and all its component tasks.
+ */
+ synchronized void fail() {
+ terminate(JobStatus.FAILED);
+ }
+
+ /**
* A task assigned to this JobInProgress has reported in as failed.
* Most of the time, we'll just reschedule execution. However, after
* many repeated failures we may instead decide to allow the entire
@@ -1924,9 +1954,9 @@
} else {
cleanup[0].kill();
}
- killJob();
+ terminateJob(JobStatus.FAILED);
} else {
- kill();
+ terminate(JobStatus.FAILED);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Sep 19 05:09:23 2008
@@ -45,6 +45,7 @@
public static final int SUCCEEDED = 2;
public static final int FAILED = 3;
public static final int PREP = 4;
+ public static final int KILLED = 5;
private JobID jobid;
private float mapProgress;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Sep 19 05:09:23 2008
@@ -45,6 +45,7 @@
* cleanupProgress to JobStatus as part of HADOOP-3150
* Version 13: Added getJobQueueInfos and getJobQueueInfo(queue name)
* and getAllJobs(queue) as a part of HADOOP-3930
+ * Version 14: Added KILLED status to JobStatus as part of HADOOP-3924
*/
public static final long versionID = 13L;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep 19 05:09:23 2008
@@ -29,8 +29,8 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
-import java.util.HashSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -1541,7 +1541,8 @@
int rjobRunState =
rjob.getStatus().getRunState();
if (rjobRunState == JobStatus.SUCCEEDED ||
- rjobRunState == JobStatus.FAILED) {
+ rjobRunState == JobStatus.FAILED ||
+ rjobRunState == JobStatus.KILLED) {
// Ok, this call to removeTaskEntries
// is dangerous is some very very obscure
// cases; e.g. when rjob completed, hit
@@ -1628,7 +1629,8 @@
for (Iterator it = jobs.values().iterator(); it.hasNext();) {
JobInProgress jip = (JobInProgress) it.next();
JobStatus status = jip.getStatus();
- if (status.getRunState() == JobStatus.FAILED) {
+ if ((status.getRunState() == JobStatus.FAILED)
+ || (status.getRunState() == JobStatus.KILLED)) {
v.add(jip);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Sep 19 05:09:23 2008
@@ -109,7 +109,7 @@
* @throws IOException
*/
public boolean isSuccessful() throws IOException;
-
+
/**
* Blocks until the job is complete.
*
@@ -118,6 +118,14 @@
public void waitForCompletion() throws IOException;
/**
+ * Returns the current state of the Job.
+ * {@link JobStatus}
+ *
+ * @throws IOException
+ */
+ public int getJobState() throws IOException;
+
+ /**
* Kill the running job. Blocks until all job tasks have been
* killed as well. If the job is no longer running, it simply returns.
*
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=697068&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Fri Sep 19 05:09:23 2008
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A JUnit test to test Kill Job & Fail Job functionality with local file
+ * system.
+ */
+public class TestJobKillAndFail extends TestCase {
+
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
+ private void runJobFail(JobConf conf) throws IOException {
+
+ conf.setJobName("testjobfail");
+ conf.setMapperClass(FailMapper.class);
+
+ RunningJob job = runJob(conf);
+ while (!job.isComplete()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ // Checking that the Job got failed
+ assertEquals(job.getJobState(), JobStatus.FAILED);
+ }
+
+ private void runJobKill(JobConf conf) throws IOException {
+
+ conf.setJobName("testjobkill");
+ conf.setMapperClass(KillMapper.class);
+
+ RunningJob job = runJob(conf);
+ while (job.getJobState() != JobStatus.RUNNING) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ job.killJob();
+ while (job.cleanupProgress() == 0.0f) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ // Checking that the Job got killed
+ assertTrue(job.isComplete());
+ assertEquals(job.getJobState(), JobStatus.KILLED);
+ }
+
+ private RunningJob runJob(JobConf conf) throws IOException {
+
+ final Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
+ final Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
+
+ // run the dummy sleep map
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(outDir, true);
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(0);
+
+ JobClient jobClient = new JobClient(conf);
+ RunningJob job = jobClient.submitJob(conf);
+
+ return job;
+
+ }
+
+ public void testJobFailAndKill() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ mr = new MiniMRCluster(2, "file:///", 3);
+
+ // run the TCs
+ JobConf conf = mr.createJobConf();
+ runJobFail(conf);
+ runJobKill(conf);
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ static class FailMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+
+ throw new RuntimeException("failing map");
+ }
+ }
+
+ static class KillMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+
+ try {
+ Thread.sleep(100000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ }
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Sep 19 05:09:23 2008
@@ -228,6 +228,7 @@
submitJobs(1, JobStatus.PREP);
submitJobs(1, JobStatus.SUCCEEDED);
submitJobs(1, JobStatus.FAILED);
+ submitJobs(1, JobStatus.KILLED);
assertNull(scheduler.assignTasks(tracker("tt1")));
}
Modified: hadoop/core/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobdetails.jsp?rev=697068&r1=697067&r2=697068&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobdetails.jsp Fri Sep 19 05:09:23 2008
@@ -226,6 +226,13 @@
"<br>\n");
out.print("<b>Failed in:</b> " + StringUtils.formatTimeDiff(
job.getFinishTime(), job.getStartTime()) + "<br>\n");
+ } else if (runState == JobStatus.KILLED) {
+ out.print("<b>Status:</b> Killed<br>\n");
+ out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
+ out.print("<b>Killed at:</b> " + new Date(job.getFinishTime()) +
+ "<br>\n");
+ out.print("<b>Killed in:</b> " + StringUtils.formatTimeDiff(
+ job.getFinishTime(), job.getStartTime()) + "<br>\n");
}
}
out.print("<b>Job Cleanup:</b>");