You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/08/15 23:55:53 UTC

svn commit: r431715 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/jobcontrol/ src/test/org/apache/hadoop/mapred/jobcontrol/

Author: cutting
Date: Tue Aug 15 14:55:52 2006
New Revision: 431715

URL: http://svn.apache.org/viewvc?rev=431715&view=rev
Log:
HADOOP-322.  Add a job control utility.  Contributed by Runping.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=431715&r1=431714&r2=431715&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 15 14:55:52 2006
@@ -46,6 +46,11 @@
     critical percentage of the datanodes are unavailable.
     (Konstantin Shvachko via cutting)
 
+11. HADOOP-322.  Add a job control utility.  This permits one to
+    specify job interdependencies.  Each job is submitted only after
+    the jobs it depends on have successfully completed.
+    (Runping Qi via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?rev=431715&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java Tue Aug 15 14:55:52 2006
@@ -0,0 +1,331 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.ArrayList;
+import java.io.IOException;
+
+/** This class encapsulates a MapReduce job and its dependency. It monitors 
+ *  the states of the depending jobs and updates the state of this job.
+ *  A job stats in the WAITING state. If it does not have any deoending jobs, or
+ *  all of the depending jobs are in SUCCESS state, then the job state will become
+ *  READY. If any depending jobs fail, the job will fail too. 
+ *  When in READY state, the job can be submitted to Hadoop for execution, with
+ *  the state changing into RUNNING state. From RUNNING state, the job can get into 
+ *  SUCCESS or FAILED state, depending the status of the jon execution.
+ *  
+ */
+
+public class Job {
+
+	// A job will be in one of the following states
+	final public static int SUCCESS = 0;
+	final public static int WAITING = 1;
+	final public static int RUNNING = 2;
+	final public static int READY = 3;
+	final public static int FAILED = 4;
+	final public static int DEPENDENT_FAILED = 5;
+	
+	
+	private JobConf theJobConf;
+	private int state;
+	private String jobID; 		// assigned and used by JobControl class
+	private String mapredJobID; // the job ID assigned by map/reduce
+	private String jobName;		// external name, assigned/used by client app
+	private String message;		// some info for human consumption, 
+								// e.g. the reason why the job failed
+	private ArrayList dependingJobs;	// the jobs the current job depends on
+	
+	private JobClient jc = null;		// the map reduce job client
+	
+    /** 
+     * Construct a job.
+     * @param jobConf a mapred job configuration representing a job to be executed.
+     * @param dependingJobs an array of jobs the current job depends on
+     */
+    public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException {
+       	this.theJobConf = jobConf;
+		this.dependingJobs = dependingJobs;
+		this.state = Job.WAITING;
+		this.jobID = "unassigned";
+		this.mapredJobID = "unassigned";
+		this.jobName = "unassigned";
+		this.message = "just initialized";
+		this.jc = new JobClient(jobConf);
+	}
+	
+	public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("job name:\t").append(this.jobName).append("\n");
+		sb.append("job id:\t").append(this.jobID).append("\n");
+		sb.append("job state:\t").append(this.state).append("\n");
+		sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
+		sb.append("job message:\t").append(this.message).append("\n");
+		
+		if (this.dependingJobs == null) {
+			sb.append("job has no depending job:\t").append("\n");
+		} else {
+			sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
+			for (int i = 0; i < this.dependingJobs.size(); i++) {
+				sb.append("\t depending job ").append(i).append(":\t");
+				sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n");
+			}
+		}
+		return sb.toString();
+	}
+	
+	/**
+	 * @return the job name of this job
+	 */
+	public String getJobName() {
+		return this.jobName;
+	}
+	
+	/**
+	 * Set the job name for  this job.
+	 * @param jobName the job name
+	 */
+	public void setJobName(String jobName) {
+		this.jobName = jobName;
+	}
+	
+	/**
+	 * @return the job ID of this job
+	 */
+	public String getJobID() {
+		return this.jobID;
+	}
+	
+	/**
+	 * Set the job ID for  this job.
+	 * @param id the job ID
+	 */
+	public void setJobID(String id) {
+		this.jobID = id;
+	}
+	
+	/**
+	 * @return the mapred ID of this job
+	 */
+	public String getMapredJobID() {
+		return this.mapredJobID;
+	}
+	
+	/**
+	 * Set the mapred ID for this job.
+	 * @param mapredJobID the mapred job ID for this job.
+	 */
+	public void setMapredJobID(String mapredJobID) {
+		this.jobID = mapredJobID;
+	}
+	
+	/**
+	 * @return the mapred job conf of this job
+	 */
+	public JobConf getJobConf() {
+		return this.theJobConf;
+	}
+	
+
+	/**
+	 * Set the mapred job conf for this job.
+	 * @param jobConf the mapred job conf for this job.
+	 */
+	public void setJobConf(JobConf jobConf) {
+		this.theJobConf = jobConf;
+	}
+	
+	/**
+	 * @return the state of this job
+	 */
+	public int getState() {
+		return this.state;
+	}
+	
+	/**
+	 * Set the state for this job.
+	 * @param state the new state for this job.
+	 */
+	public void setState(int state) {
+		this.state = state;
+	}
+	
+	/**
+	 * @return the message of this job
+	 */
+	public String getMessage() {
+		return this.message;
+	}
+	
+	/**
+	 * Set the message for this job.
+	 * @param message the message for this job.
+	 */
+	public void setMessage(String message) {
+		this.message = message;
+	}
+	
+	/**
+	 * @return the depending jobs of this job
+	 */
+	public ArrayList getDependingJobs() {
+		return this.dependingJobs;
+	}
+	
+	/**
+	 * @return true if this job is in a complete state
+	 */
+	public boolean isCompleted() {
+		return this.state == Job.FAILED || 
+		       this.state == Job.DEPENDENT_FAILED ||
+		       this.state == Job.SUCCESS;
+	}
+	
+	/**
+	 * @return true if this job is in READY state
+	 */
+	public boolean isReady() {
+		return this.state == Job.READY;
+	}
+	
+	/**
+	 * Check the state of this running job. The state may 
+	 * remain the same, become SUCCESS or FAILED.
+	 */
+	private void checkRunningState() {
+		RunningJob running = null;
+		try {
+			running = jc.getJob(this.mapredJobID);
+			if (running.isComplete()) {
+				if (running.isSuccessful()) {
+					this.state = Job.SUCCESS;
+				} else {
+					this.state = Job.FAILED;
+					this.message = "Job failed!";
+					try {
+						running.killJob();
+					} catch (IOException e1) {
+
+					}
+					try {
+						this.jc.close();
+					} catch (IOException e2) {
+
+					}
+				}
+			}
+
+		} catch (IOException ioe) {
+			this.state = Job.FAILED;
+			this.message = StringUtils.stringifyException(ioe);
+			try {
+				running.killJob();
+			} catch (IOException e1) {
+
+			}
+			try {
+				this.jc.close();
+			} catch (IOException e1) {
+
+			}
+		}
+	}
+	
+	/**
+	 * Check and update the state of this job. The state changes  
+	 * depending on its current state and the states of the depending jobs.
+	 */
+	public int checkState() {
+		if (this.state == Job.RUNNING) {
+			checkRunningState();
+		}
+		if (this.state != Job.WAITING) {
+			return this.state;
+		}
+		if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+			this.state = Job.READY;
+			return this.state;
+		}
+		Job pred = null;
+		int n = this.dependingJobs.size();
+		for (int i = 0; i < n; i++) {
+			pred = (Job) this.dependingJobs.get(i);
+			int s = pred.checkState();
+			if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
+				break; // a pred is still not completed, continue in WAITING
+						// state
+			}
+			if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
+				this.state = Job.DEPENDENT_FAILED;
+				this.message = "depending job " + i + " with jobID "
+						+ pred.getJobID() + " failed. " + pred.getMessage();
+				break;
+			}
+			// pred must be in success state
+			if (i == n - 1) {
+				this.state = Job.READY;
+			}
+		}
+
+		return this.state;
+	}
+	
+    /**
+     * Submit this job to mapred. The state becomes RUNNING if submission 
+     * is successful, FAILED otherwise.  
+     */
+    public void submit() {
+        try {
+            if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
+                FileSystem fs = FileSystem.get(theJobConf);
+                Path inputPaths[] = theJobConf.getInputPaths();
+                for (int i = 0; i < inputPaths.length; i++) {
+                    if (!fs.exists(inputPaths[i])) {
+                        try {
+                            fs.mkdirs(inputPaths[i]);
+                        } catch (IOException e) {
+
+                        }
+                    }
+                }
+            }
+            RunningJob running = jc.submitJob(theJobConf);
+            this.mapredJobID = running.getJobID();
+            this.state = Job.RUNNING;
+        } catch (IOException ioe) {
+            this.state = Job.FAILED;
+            this.message = StringUtils.stringifyException(ioe);
+        }
+    }
+	
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) {
+		// TODO Auto-generated method stub
+
+	}
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?rev=431715&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Tue Aug 15 14:55:52 2006
@@ -0,0 +1,299 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+
+/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
+ *  the states of the jobs by placing them into different tables according to their 
+ *  states. 
+ *  
+ *  This class provides APIs for the client app to add a job to the group and to get 
+ *  the jobs in the group in different states. When a 
+ *  job is added, an ID unique to the group is assigned to the job. 
+ *  
+ *  This class has a thread that submits jobs when they become ready, monitors the
+ *  states of the running jobs, and updates the states of jobs based on the state changes 
+ *  of their depending jobs states. The class provides APIs for suspending/resuming
+ *  the thread,and for stopping the thread.
+ *  
+ */
+public class JobControl implements Runnable{
+
+	// The thread can be in one of the following state
+	private static final int RUNNING = 0;
+	private static final int SUSPENDED = 1;
+	private static final int STOPPED = 2;
+	private static final int STOPPING = 3;
+	private static final int READY = 4;
+	
+	private int runnerState;			// the thread state
+	
+	private Hashtable waitingJobs;
+	private Hashtable readyJobs;
+	private Hashtable runningJobs;
+	private Hashtable successfulJobs;
+	private Hashtable failedJobs;
+	
+	private long nextJobID;
+	private String groupName;
+	
+	/** 
+	 * Construct a job control for a group of jobs.
+	 * @param groupName a name identifying this group
+	 */
+	public JobControl(String groupName) {
+		this.waitingJobs = new Hashtable();
+		this.readyJobs = new Hashtable();
+		this.runningJobs = new Hashtable();
+		this.successfulJobs = new Hashtable();
+		this.failedJobs = new Hashtable();
+		this.nextJobID = -1;
+		this.groupName = groupName;
+		this.runnerState = JobControl.READY;
+		
+	}
+	
+	private static ArrayList toArrayList(Hashtable jobs) {
+		ArrayList retv = new ArrayList();
+		Iterator iter = jobs.values().iterator();
+		while (iter.hasNext()) {
+			retv.add(iter.next());
+		}
+		return retv;
+	}
+	
+	/**
+	 * @return the jobs in the waiting state
+	 */
+	public ArrayList getWaitingJobs() {
+		return JobControl.toArrayList(this.waitingJobs);
+	}
+	
+	/**
+	 * @return the jobs in the running state
+	 */
+	public ArrayList getRunningJobs() {
+		return JobControl.toArrayList(this.runningJobs);
+	}
+	
+	/**
+	 * @return the jobs in the ready state
+	 */
+	public ArrayList getReadyJobs() {
+		return JobControl.toArrayList(this.readyJobs);
+	}
+	
+	/**
+	 * @return the jobs in the success state
+	 */
+	public ArrayList getSuccessfulJobs() {
+		return JobControl.toArrayList(this.successfulJobs);
+	}
+	
+	public ArrayList getFailedJobs() {
+		return JobControl.toArrayList(this.failedJobs);
+	}
+	
+	private String getNextJobID() {
+		nextJobID += 1;
+		return this.groupName + this.nextJobID;
+	}
+	
+	private static void addToQueue(Job aJob, Hashtable queue) {
+		synchronized(queue) {
+			queue.put(aJob.getJobID(), aJob);
+		}		
+	}
+	
+	private void addToQueue(Job aJob) {
+		Hashtable queue = getQueue(aJob.getState());
+		addToQueue(aJob, queue);	
+	}
+	
+	private Hashtable getQueue(int state) {
+		Hashtable retv = null;
+		if (state == Job.WAITING) {
+			retv = this.waitingJobs;
+		} else if (state == Job.READY) {
+			retv = this.readyJobs;
+		} else if (state == Job.RUNNING) {
+			retv = this.runningJobs;
+		} else if (state == Job.SUCCESS) {
+			retv = this.successfulJobs;
+		} else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
+			retv = this.failedJobs;
+		} 
+		return retv;
+			
+	}
+
+	/**
+	 * Add a new job.
+	 * @param aJob the the new job
+	 */
+	synchronized public String addJob(Job aJob) {
+		String id = this.getNextJobID();
+		aJob.setJobID(id);
+		aJob.setState(Job.WAITING);
+		this.addToQueue(aJob);
+		return id;	
+	}
+	
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) {
+		// TODO Auto-generated method stub
+
+	}
+
+	/**
+	 * @return the thread state
+	 */
+	public int getState() {
+		return this.runnerState;
+	}
+	
+	/**
+	 * set the thread state to STOPPING so that the 
+	 * thread will stop when it wakes up.
+	 */
+	public void stop() {
+		this.runnerState = JobControl.STOPPING;
+	}
+	
+	/**
+	 * suspend the running thread
+	 */
+	public void suspend () {
+		if (this.runnerState == JobControl.RUNNING) {
+			this.runnerState = JobControl.SUSPENDED;
+		}
+	}
+	
+	/**
+	 * resume the suspended thread
+	 */
+	public void resume () {
+		if (this.runnerState == JobControl.SUSPENDED) {
+			this.runnerState = JobControl.RUNNING;
+		}
+	}
+	
+	synchronized private void checkRunningJobs() {
+		
+		Hashtable oldJobs = null;
+		oldJobs = this.runningJobs;
+		this.runningJobs = new Hashtable();
+		
+		Iterator jobs = oldJobs.values().iterator();
+		while (jobs.hasNext()) {
+			Job nextJob = (Job)jobs.next();
+			int state = nextJob.checkState();
+			/*
+			if (state != Job.RUNNING) {
+				System.out.println("The state of the running job " +
+					nextJob.getJobName() + " has changed to: " + nextJob.getState());
+			}
+			*/
+			this.addToQueue(nextJob);
+		}
+	}
+	
+	synchronized private void checkWaitingJobs() {
+		Hashtable oldJobs = null;
+		oldJobs = this.waitingJobs;
+		this.waitingJobs = new Hashtable();
+		
+		Iterator jobs = oldJobs.values().iterator();
+		while (jobs.hasNext()) {
+			Job nextJob = (Job)jobs.next();
+			int state = nextJob.checkState();
+			/*
+			if (state != Job.WAITING) {
+				System.out.println("The state of the waiting job " +
+					nextJob.getJobName() + " has changed to: " + nextJob.getState());
+			}
+			*/
+			this.addToQueue(nextJob);
+		}
+	}
+	
+	synchronized private void startReadyJobs() {
+		Hashtable oldJobs = null;
+		oldJobs = this.readyJobs;
+		this.readyJobs = new Hashtable();
+		
+		Iterator jobs = oldJobs.values().iterator();
+		while (jobs.hasNext()) {
+			Job nextJob = (Job)jobs.next();
+			//System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
+			nextJob.submit();
+			//System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
+			this.addToQueue(nextJob);
+		}	
+	}
+	
+	synchronized public boolean allFinished() {
+		return this.waitingJobs.size() == 0 &&
+			this.readyJobs.size() == 0 &&
+			this.runningJobs.size() == 0;
+	}
+	
+	/**
+	 *  The main loop for the thread.
+	 *  The loop does the following:
+	 *  	Check the states of the running jobs
+	 *  	Update the states of waiting jobs
+	 *  	Submit the jobs in ready state
+	 */
+	public void run() {
+		this.runnerState = JobControl.RUNNING;
+		while (true) {
+			while (this.runnerState == JobControl.SUSPENDED) {
+				try {
+					Thread.sleep(5000);
+				}
+				catch (Exception e) {
+					
+				}
+			}
+			checkRunningJobs();	
+			checkWaitingJobs();		
+			startReadyJobs();		
+			if (this.runnerState != JobControl.RUNNING && 
+					this.runnerState != JobControl.SUSPENDED) {
+				break;
+			}
+			try {
+				Thread.sleep(5000);
+			}
+			catch (Exception e) {
+				
+			}
+			if (this.runnerState != JobControl.RUNNING && 
+					this.runnerState != JobControl.SUSPENDED) {
+				break;
+			}
+		}
+		this.runnerState = JobControl.STOPPED;
+	}
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html?rev=431715&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html Tue Aug 15 14:55:52 2006
@@ -0,0 +1,7 @@
+<html>
+<body>
+
+<p>Utilities for managing dependent jobs.</p>
+
+</body>
+</html>

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java?rev=431715&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java Tue Aug 15 14:55:52 2006
@@ -0,0 +1,297 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class performs unit test for Job/JobControl classes.
+ *  
+ * @author runping
+ *
+ */
+public class TestJobControl extends junit.framework.TestCase {
+
+    private static NumberFormat idFormat = NumberFormat.getInstance();
+    static {
+        idFormat.setMinimumIntegerDigits(4);
+        idFormat.setGroupingUsed(false);
+    }
+
+    static private Random rand = new Random();
+
+    private static void cleanData(FileSystem fs, Path dirPath)
+            throws IOException {
+        fs.delete(dirPath);
+    }
+
+    private static String generateRandomWord() {
+        return idFormat.format(rand.nextLong());
+    }
+
+    private static String generateRandomLine() {
+        long r = rand.nextLong() % 7;
+        long n = r + 20;
+        StringBuffer sb = new StringBuffer();
+        for (int i = 0; i < n; i++) {
+            sb.append(generateRandomWord()).append(" ");
+        }
+        sb.append("\n");
+        return sb.toString();
+    }
+
+    private static void generateData(FileSystem fs, Path dirPath)
+            throws IOException {
+        FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
+        for (int i = 0; i < 100000; i++) {
+            String line = TestJobControl.generateRandomLine();
+            out.write(line.getBytes("UTF-8"));
+        }
+        out.close();
+    }
+
+    public static class DataCopy extends MapReduceBase implements Mapper,
+            Reducer {
+        public void map(WritableComparable key, Writable value,
+                OutputCollector output, Reporter reporter) throws IOException {
+            output.collect(new Text(key.toString()), value);
+        }
+
+        public void reduce(WritableComparable key, Iterator values,
+                OutputCollector output, Reporter reporter) throws IOException {
+            Text dumbKey = new Text("");
+            while (values.hasNext()) {
+                Text data = (Text) values.next();
+                output.collect(dumbKey, data);
+            }
+        }
+    }
+
+    private static JobConf createCopyJob(ArrayList indirs, Path outdir)
+            throws Exception {
+
+        Configuration defaults = new Configuration();
+        JobConf theJob = new JobConf(defaults, TestJobControl.class);
+        theJob.setJobName("DataMoveJob");
+
+        theJob.setInputPath((Path) indirs.get(0));
+        if (indirs.size() > 1) {
+            for (int i = 1; i < indirs.size(); i++) {
+                theJob.addInputPath((Path) indirs.get(i));
+            }
+        }
+        theJob.setMapperClass(DataCopy.class);
+        theJob.setOutputPath(outdir);
+        theJob.setOutputKeyClass(Text.class);
+        theJob.setOutputValueClass(Text.class);
+        theJob.setReducerClass(DataCopy.class);
+        theJob.setNumMapTasks(12);
+        theJob.setNumReduceTasks(4);
+        return theJob;
+    }
+
+    /**
+     * This is a main function for testing JobControl class.
+     * It first cleans all the dirs it will use. Then it generates some random text
+     * data in TestJobControlData/indir. Then it creates 4 jobs: 
+     *      Job 1: copy data from indir to outdir_1
+     *      Job 2: copy data from indir to outdir_2
+     *      Job 3: copy data from outdir_1 and outdir_2 to outdir_3
+     *      Job 4: copy data from outdir to outdir_4
+     * The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1 and 2.
+     * The job 4 depends on job 3.
+     * 
+     * Then it creates a JobControl object and add the 4 jobs to the JobControl object.
+     * Finally, it creates a thread to run the JobControl object and monitors/reports
+     * the job states.
+     * 
+     * @param args
+     */
+    public static void doJobControlTest() throws Exception {
+        
+        Configuration defaults = new Configuration();
+        FileSystem fs = FileSystem.get(defaults);
+        Path rootDataDir = new Path(System.getProperty("test.build.data", "."), "TestJobControlData");
+        Path indir = new Path(rootDataDir, "indir");
+        Path outdir_1 = new Path(rootDataDir, "outdir_1");
+        Path outdir_2 = new Path(rootDataDir, "outdir_2");
+        Path outdir_3 = new Path(rootDataDir, "outdir_3");
+        Path outdir_4 = new Path(rootDataDir, "outdir_4");
+
+        cleanData(fs, indir);
+        generateData(fs, indir);
+
+        cleanData(fs, outdir_1);
+        cleanData(fs, outdir_2);
+        cleanData(fs, outdir_3);
+        cleanData(fs, outdir_4);
+
+        ArrayList dependingJobs = null;
+
+        ArrayList inPaths_1 = new ArrayList();
+        inPaths_1.add(indir);
+        JobConf jobConf_1 = createCopyJob(inPaths_1, outdir_1);
+        Job job_1 = new Job(jobConf_1, dependingJobs);
+        ArrayList inPaths_2 = new ArrayList();
+        inPaths_2.add(indir);
+        JobConf jobConf_2 = createCopyJob(inPaths_2, outdir_2);
+        Job job_2 = new Job(jobConf_2, dependingJobs);
+
+        ArrayList inPaths_3 = new ArrayList();
+        inPaths_3.add(outdir_1);
+        inPaths_3.add(outdir_2);
+        JobConf jobConf_3 = createCopyJob(inPaths_3, outdir_3);
+        dependingJobs = new ArrayList();
+        dependingJobs.add(job_1);
+        dependingJobs.add(job_2);
+        Job job_3 = new Job(jobConf_3, dependingJobs);
+
+        ArrayList inPaths_4 = new ArrayList();
+        inPaths_4.add(outdir_3);
+        JobConf jobConf_4 = createCopyJob(inPaths_4, outdir_4);
+        dependingJobs = new ArrayList();
+        dependingJobs.add(job_3);
+        Job job_4 = new Job(jobConf_4, dependingJobs);
+
+        JobControl theControl = new JobControl("Test");
+        theControl.addJob(job_1);
+        theControl.addJob(job_2);
+        theControl.addJob(job_3);
+        theControl.addJob(job_4);
+
+        Thread theController = new Thread(theControl);
+        theController.start();
+        while (!theControl.allFinished()) {
+
+            System.out.println("Jobs in waiting state: "
+                    + theControl.getWaitingJobs().size());
+            System.out.println("Jobs in ready state: "
+                    + theControl.getReadyJobs().size());
+            System.out.println("Jobs in running state: "
+                    + theControl.getRunningJobs().size());
+            System.out.println("Jobs in success state: "
+                    + theControl.getSuccessfulJobs().size());
+            System.out.println("Jobs in failed state: "
+                    + theControl.getFailedJobs().size());
+            System.out.println("\n");
+
+            try {
+                Thread.sleep(5000);
+            } catch (Exception e) {
+
+            }
+        }
+        System.out.println("Jobs are all done???");
+        System.out.println("Jobs in waiting state: "
+                + theControl.getWaitingJobs().size());
+        System.out.println("Jobs in ready state: "
+                + theControl.getReadyJobs().size());
+        System.out.println("Jobs in running state: "
+                + theControl.getRunningJobs().size());
+        System.out.println("Jobs in success state: "
+                + theControl.getSuccessfulJobs().size());
+        System.out.println("Jobs in failed state: "
+                + theControl.getFailedJobs().size());
+        System.out.println("\n");
+        
+        if (job_1.getState() != Job.FAILED && 
+                job_1.getState() != Job.DEPENDENT_FAILED && 
+                job_1.getState() != Job.SUCCESS) {
+           
+                String states = "job_1:  " + job_1.getState() + "\n";
+                throw new Exception("The state of job_1 is not in a complete state\n" + states);
+        }
+        
+        if (job_2.getState() != Job.FAILED &&
+                job_2.getState() != Job.DEPENDENT_FAILED && 
+                job_2.getState() != Job.SUCCESS) {
+           
+                String states = "job_2:  " + job_2.getState() + "\n";
+                throw new Exception("The state of job_2 is not in a complete state\n" + states);
+        }
+        
+        if (job_3.getState() != Job.FAILED && 
+                job_3.getState() != Job.DEPENDENT_FAILED && 
+                job_3.getState() != Job.SUCCESS) {
+           
+                String states = "job_3:  " + job_3.getState() + "\n";
+                throw new Exception("The state of job_3 is not in a complete state\n" + states);
+        }
+        if (job_4.getState() != Job.FAILED && 
+                job_4.getState() != Job.DEPENDENT_FAILED && 
+                job_4.getState() != Job.SUCCESS) {
+           
+                String states = "job_4:  " + job_4.getState() + "\n";
+                throw new Exception("The state of job_4 is not in a complete state\n" + states);
+        }
+        
+        if (job_1.getState() == Job.FAILED || 
+                job_2.getState() == Job.FAILED ||
+                job_1.getState() == Job.DEPENDENT_FAILED || 
+                job_2.getState() == Job.DEPENDENT_FAILED) {
+            if (job_3.getState() != Job.DEPENDENT_FAILED) {
+                String states = "job_1:  " + job_1.getState() + "\n";
+                states = "job_2:  " + job_2.getState() + "\n";
+                states = "job_3:  " + job_3.getState() + "\n";
+                states = "job_4:  " + job_4.getState() + "\n";
+                throw new Exception("The states of jobs 1, 2, 3, 4 are not consistent\n" + states);
+            }
+        }
+        if (job_3.getState() == Job.FAILED || 
+                job_3.getState() == Job.DEPENDENT_FAILED) {
+            if (job_4.getState() != Job.DEPENDENT_FAILED) {
+                String states = "job_3:  " + job_3.getState() + "\n";
+                states = "job_4:  " + job_4.getState() + "\n";
+                throw new Exception("The states of jobs 3, 4 are not consistent\n" + states);
+            }
+        }
+        
+        theControl.stop();
+    }
+
+    public void testJobControl() throws Exception {
+        doJobControlTest();
+    }
+    
+    public static void main(String[] args) {
+        TestJobControl test = new TestJobControl();
+        try {
+            test.testJobControl();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}