You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/02/14 19:38:44 UTC
svn commit: r377798 - in
/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred: ClusterStatus.java
JobClient.java JobSubmissionProtocol.java JobTracker.java LocalJobRunner.java
Author: cutting
Date: Tue Feb 14 10:38:42 2006
New Revision: 377798
URL: http://svn.apache.org/viewcvs?rev=377798&view=rev
Log:
HADOOP-37: Add ClusterStatus. Contributed by Owen O'Malley.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=377798&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java Tue Feb 14 10:38:42 2006
@@ -0,0 +1,82 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableFactories;
+
+/**
+ * Summarizes the size and current state of the cluster.
+ * @author Owen O'Malley
+ */
+public class ClusterStatus implements Writable {
+
+ static { // register a ctor
+ WritableFactories.setFactory
+ (ClusterStatus.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new ClusterStatus(); }
+ });
+ }
+
+ private int task_trackers;
+ private int map_tasks;
+ private int reduce_tasks;
+ private int max_tasks;
+
+ private ClusterStatus() {}
+
+ ClusterStatus(int trackers, int maps, int reduces, int max) {
+ task_trackers = trackers;
+ map_tasks = maps;
+ reduce_tasks = reduces;
+ max_tasks = max;
+ }
+
+
+ /**
+ * The number of task trackers in the cluster.
+ */
+ public int getTaskTrackers() {
+ return task_trackers;
+ }
+
+ /**
+ * The number of currently running map tasks.
+ */
+ public int getMapTasks() {
+ return map_tasks;
+ }
+
+ /**
+ * The number of current running reduce tasks.
+ */
+ public int getReduceTasks() {
+ return reduce_tasks;
+ }
+
+ /**
+ * The maximum capacity for running tasks in the cluster.
+ */
+ public int getMaxTasks() {
+ return max_tasks;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(task_trackers);
+ out.writeInt(map_tasks);
+ out.writeInt(reduce_tasks);
+ out.writeInt(max_tasks);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ task_trackers = in.readInt();
+ map_tasks = in.readInt();
+ reduce_tasks = in.readInt();
+ max_tasks = in.readInt();
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=377798&r1=377797&r2=377798&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Feb 14 10:38:42 2006
@@ -279,6 +279,10 @@
}
}
+ public ClusterStatus getClusterStatus() throws IOException {
+ return jobSubmitClient.getClusterStatus();
+ }
+
/** Utility that submits a job, then polls for progress until the job is
* complete. */
public static void runJob(JobConf job) throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=377798&r1=377797&r2=377798&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Tue Feb 14 10:38:42 2006
@@ -32,6 +32,12 @@
public JobStatus submitJob(String jobFile) throws IOException;
/**
+ * Get the current status of the cluster
+ * @return summary of the state of the cluster
+ */
+ public ClusterStatus getClusterStatus();
+
+ /**
* Kill the indicated job
*/
public void killJob(String jobid);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=377798&r1=377797&r2=377798&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Feb 14 10:38:42 2006
@@ -651,6 +651,13 @@
return job.getStatus();
}
+ public synchronized ClusterStatus getClusterStatus() {
+ return new ClusterStatus(taskTrackers.size(),
+ totalMaps,
+ totalReduces,
+ maxCurrentTasks);
+ }
+
public synchronized void killJob(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
job.kill();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=377798&r1=377797&r2=377798&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Feb 14 10:38:42 2006
@@ -32,6 +32,8 @@
private FileSystem fs;
private HashMap jobs = new HashMap();
private Configuration conf;
+ private int map_tasks = 0;
+ private int reduce_tasks = 0;
private class Job extends Thread
implements TaskUmbilicalProtocol {
@@ -73,7 +75,9 @@
mapIds.add("map_" + newId());
MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
map.setConf(job);
+ map_tasks += 1;
map.run(job, this);
+ map_tasks -= 1;
}
// move map output to reduce input
@@ -98,7 +102,9 @@
mapDependencies,
0);
reduce.setConf(job);
+ reduce_tasks += 1;
reduce.run(job, this);
+ reduce_tasks -= 1;
this.mapoutputFile.removeAll(reduceId);
this.status.runState = JobStatus.SUCCEEDED;
@@ -183,5 +189,9 @@
public String getFilesystemName() throws IOException {
return fs.getName();
+ }
+
+ public ClusterStatus getClusterStatus() {
+ return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
}
}