You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/23 13:15:36 UTC
svn commit: r707341 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
Author: ddas
Date: Thu Oct 23 04:15:35 2008
New Revision: 707341
URL: http://svn.apache.org/viewvc?rev=707341&view=rev
Log:
HADOOP-4440. TestJobInProgressListener tests for jobs killed in queued state. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=707341&r1=707340&r2=707341&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Oct 23 04:15:35 2008
@@ -65,6 +65,9 @@
HADOOP-4408. FsAction functions need not create new objects. (cdouglas)
+ HADOOP-4440. TestJobInProgressListener tests for jobs killed in queued
+ state (Amar Kamat via ddas)
+
Release 0.19.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=707341&r1=707340&r2=707341&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Thu Oct 23 04:15:35 2008
@@ -33,7 +33,7 @@
private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
- private EagerTaskInitializationListener eagerTaskInitializationListener;
+ protected EagerTaskInitializationListener eagerTaskInitializationListener;
private float padFraction;
public JobQueueTaskScheduler() {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=707341&r1=707340&r2=707341&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Thu Oct 23 04:15:35 2008
@@ -242,9 +242,13 @@
// RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
JobInProgress jip = event.getJobInProgress();
String jobId = jip.getJobID().toString();
- if (statusEvent.getJobInProgress().isComplete()) {
+ if (jip.isComplete()) {
LOG.info("Job " + jobId + " deleted from the running queue");
- jobs.remove(jip);
+ if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
+ wjobs.remove(jip);
+ } else {
+ jobs.remove(jip);
+ }
} else {
// PREP->RUNNING
LOG.info("Job " + jobId + " deleted from the waiting queue");
@@ -329,4 +333,58 @@
assertFalse("Missing event notification for a successful job",
myListener.contains(rJob.getID(), false));
}
-}
\ No newline at end of file
+
+ /**
+ * This scheduler never schedules any task as it doesnt init any task. So all
+ * the jobs are queued forever.
+ */
+ public static class MyScheduler extends JobQueueTaskScheduler {
+
+ @Override
+ public synchronized void start() throws IOException {
+ super.start();
+ // Remove the eager task initializer
+ taskTrackerManager.removeJobInProgressListener(
+ eagerTaskInitializationListener);
+ // terminate it
+ eagerTaskInitializationListener.terminate();
+ }
+ }
+
+ public void testQueuedJobKill() throws Exception {
+ LOG.info("Testing queued-job-kill");
+
+ MyListener myListener = new MyListener();
+
+ JobConf job = new JobConf();
+ job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
+ TaskScheduler.class);
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
+
+ job = mr.createJobConf();
+
+ mr.getJobTrackerRunner().getJobTracker()
+ .addJobInProgressListener(myListener);
+
+ RunningJob rJob = TestJobKillAndFail.runJob(job);
+ JobID id = rJob.getID();
+ LOG.info("Job : " + id.toString() + " submitted");
+
+ // check if the job is in the waiting queue
+ assertTrue("Missing event notification on submiting a job",
+ myListener.contains(id, true));
+
+ // kill the job
+ LOG.info("Killing job : " + id.toString());
+ rJob.killJob();
+
+ // check if the job is killed
+ assertEquals("Job status doesnt reflect the kill-job action",
+ JobStatus.KILLED, rJob.getJobState());
+
+ // check if the job is correctly moved
+ // from the waiting list
+ assertFalse("Missing event notification on killing a waiting job",
+ myListener.contains(id, true));
+ }
+}