You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC

svn commit: r529410 [19/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Mon Apr 16 14:44:35 2007
@@ -32,154 +32,154 @@
  **************************************************/
 class TaskTrackerStatus implements Writable {
 
-    static {                                        // register a ctor
-      WritableFactories.setFactory
-        (TaskTrackerStatus.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new TaskTrackerStatus(); }
-         });
-    }
-
-    String trackerName;
-    String host;
-    int httpPort;
-    int failures;
-    List<TaskStatus> taskReports;
+  static {                                        // register a ctor
+    WritableFactories.setFactory
+      (TaskTrackerStatus.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new TaskTrackerStatus(); }
+       });
+  }
+
+  String trackerName;
+  String host;
+  int httpPort;
+  int failures;
+  List<TaskStatus> taskReports;
     
-    volatile long lastSeen;
+  volatile long lastSeen;
     
-    /**
-     */
-    public TaskTrackerStatus() {
-      taskReports = new ArrayList();
-    }
-
-    /**
-     */
-    public TaskTrackerStatus(String trackerName, String host, 
-                             int httpPort, List<TaskStatus> taskReports, 
-                             int failures) {
-        this.trackerName = trackerName;
-        this.host = host;
-        this.httpPort = httpPort;
-
-        this.taskReports = new ArrayList(taskReports);
-        this.failures = failures;
-    }
-
-    /**
-     */
-    public String getTrackerName() {
-        return trackerName;
-    }
-    /**
-     */
-    public String getHost() {
-        return host;
-    }
-
-    /**
-     * Get the port that this task tracker is serving http requests on.
-     * @return the http port
-     */
-    public int getHttpPort() {
-      return httpPort;
-    }
+  /**
+   */
+  public TaskTrackerStatus() {
+    taskReports = new ArrayList();
+  }
+
+  /**
+   */
+  public TaskTrackerStatus(String trackerName, String host, 
+                           int httpPort, List<TaskStatus> taskReports, 
+                           int failures) {
+    this.trackerName = trackerName;
+    this.host = host;
+    this.httpPort = httpPort;
+
+    this.taskReports = new ArrayList(taskReports);
+    this.failures = failures;
+  }
+
+  /**
+   */
+  public String getTrackerName() {
+    return trackerName;
+  }
+  /**
+   */
+  public String getHost() {
+    return host;
+  }
+
+  /**
+   * Get the port that this task tracker is serving http requests on.
+   * @return the http port
+   */
+  public int getHttpPort() {
+    return httpPort;
+  }
     
-    /**
-     * Get the number of tasks that have failed on this tracker.
-     * @return The number of failed tasks
-     */
-    public int getFailures() {
-      return failures;
-    }
+  /**
+   * Get the number of tasks that have failed on this tracker.
+   * @return The number of failed tasks
+   */
+  public int getFailures() {
+    return failures;
+  }
     
-    /**
-     * Get the current tasks at the TaskTracker.
-     * Tasks are tracked by a {@link TaskStatus} object.
-     * 
-     * @return a list of {@link TaskStatus} representing 
-     *         the current tasks at the TaskTracker.
-     */
-    public List<TaskStatus> getTaskReports() {
-      return taskReports;
-    }
+  /**
+   * Get the current tasks at the TaskTracker.
+   * Tasks are tracked by a {@link TaskStatus} object.
+   * 
+   * @return a list of {@link TaskStatus} representing 
+   *         the current tasks at the TaskTracker.
+   */
+  public List<TaskStatus> getTaskReports() {
+    return taskReports;
+  }
     
-    /**
-     * Return the current MapTask count
-     */
-    public int countMapTasks() {
-      int mapCount = 0;
-      for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
-        TaskStatus ts = (TaskStatus) it.next();
-        TaskStatus.State state = ts.getRunState();
-        if (ts.getIsMap() &&
-            ((state == TaskStatus.State.RUNNING) ||
-                (state == TaskStatus.State.UNASSIGNED))) {
-          mapCount++;
-        }
+  /**
+   * Return the current MapTask count
+   */
+  public int countMapTasks() {
+    int mapCount = 0;
+    for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+      TaskStatus ts = (TaskStatus) it.next();
+      TaskStatus.State state = ts.getRunState();
+      if (ts.getIsMap() &&
+          ((state == TaskStatus.State.RUNNING) ||
+           (state == TaskStatus.State.UNASSIGNED))) {
+        mapCount++;
       }
-      return mapCount;
     }
+    return mapCount;
+  }
 
-    /**
-     * Return the current ReduceTask count
-     */
-    public int countReduceTasks() {
-      int reduceCount = 0;
-      for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
-        TaskStatus ts = (TaskStatus) it.next();
-        TaskStatus.State state = ts.getRunState();
-        if ((!ts.getIsMap()) &&
-            ((state == TaskStatus.State.RUNNING) ||  
-                (state == TaskStatus.State.UNASSIGNED))) {
-          reduceCount++;
-        }
+  /**
+   * Return the current ReduceTask count
+   */
+  public int countReduceTasks() {
+    int reduceCount = 0;
+    for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+      TaskStatus ts = (TaskStatus) it.next();
+      TaskStatus.State state = ts.getRunState();
+      if ((!ts.getIsMap()) &&
+          ((state == TaskStatus.State.RUNNING) ||  
+           (state == TaskStatus.State.UNASSIGNED))) {
+        reduceCount++;
       }
-      return reduceCount;
-    }
-
-    /**
-     */
-    public long getLastSeen() {
-        return lastSeen;
-    }
-    /**
-     */
-    public void setLastSeen(long lastSeen) {
-        this.lastSeen = lastSeen;
-    }
-
-    ///////////////////////////////////////////
-    // Writable
-    ///////////////////////////////////////////
-    public void write(DataOutput out) throws IOException {
-        UTF8.writeString(out, trackerName);
-        UTF8.writeString(out, host);
-        out.writeInt(httpPort);
-
-        out.writeInt(taskReports.size());
-        out.writeInt(failures);
-        for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
-            ((TaskStatus) it.next()).write(out);
-        }
     }
+    return reduceCount;
+  }
 
-    /**
-     */     
-    public void readFields(DataInput in) throws IOException {
-        this.trackerName = UTF8.readString(in);
-        this.host = UTF8.readString(in);
-        this.httpPort = in.readInt();
-
-        taskReports.clear();
-
-        int numTasks = in.readInt();
-        this.failures = in.readInt();
-        for (int i = 0; i < numTasks; i++) {
-            TaskStatus tmp = new TaskStatus();
-            tmp.readFields(in);
-            taskReports.add(tmp);
-        }
+  /**
+   */
+  public long getLastSeen() {
+    return lastSeen;
+  }
+  /**
+   */
+  public void setLastSeen(long lastSeen) {
+    this.lastSeen = lastSeen;
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    UTF8.writeString(out, trackerName);
+    UTF8.writeString(out, host);
+    out.writeInt(httpPort);
+
+    out.writeInt(taskReports.size());
+    out.writeInt(failures);
+    for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+      ((TaskStatus) it.next()).write(out);
+    }
+  }
+
+  /**
+   */     
+  public void readFields(DataInput in) throws IOException {
+    this.trackerName = UTF8.readString(in);
+    this.host = UTF8.readString(in);
+    this.httpPort = in.readInt();
+
+    taskReports.clear();
+
+    int numTasks = in.readInt();
+    this.failures = in.readInt();
+    for (int i = 0; i < numTasks; i++) {
+      TaskStatus tmp = new TaskStatus();
+      tmp.readFields(in);
+      taskReports.add(tmp);
     }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java Mon Apr 16 14:44:35 2007
@@ -42,293 +42,293 @@
 
 public class Job {
 
-	// A job will be in one of the following states
-	final public static int SUCCESS = 0;
-	final public static int WAITING = 1;
-	final public static int RUNNING = 2;
-	final public static int READY = 3;
-	final public static int FAILED = 4;
-	final public static int DEPENDENT_FAILED = 5;
-	
-	
-	private JobConf theJobConf;
-	private int state;
-	private String jobID; 		// assigned and used by JobControl class
-	private String mapredJobID; // the job ID assigned by map/reduce
-	private String jobName;		// external name, assigned/used by client app
-	private String message;		// some info for human consumption, 
-								// e.g. the reason why the job failed
-	private ArrayList dependingJobs;	// the jobs the current job depends on
-	
-	private JobClient jc = null;		// the map reduce job client
-	
-    /** 
-     * Construct a job.
-     * @param jobConf a mapred job configuration representing a job to be executed.
-     * @param dependingJobs an array of jobs the current job depends on
-     */
-    public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException {
-       	this.theJobConf = jobConf;
-		this.dependingJobs = dependingJobs;
-		this.state = Job.WAITING;
-		this.jobID = "unassigned";
-		this.mapredJobID = "unassigned";
-		this.jobName = "unassigned";
-		this.message = "just initialized";
-		this.jc = new JobClient(jobConf);
-	}
-	
-	public String toString() {
-		StringBuffer sb = new StringBuffer();
-		sb.append("job name:\t").append(this.jobName).append("\n");
-		sb.append("job id:\t").append(this.jobID).append("\n");
-		sb.append("job state:\t").append(this.state).append("\n");
-		sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
-		sb.append("job message:\t").append(this.message).append("\n");
+  // A job will be in one of the following states
+  final public static int SUCCESS = 0;
+  final public static int WAITING = 1;
+  final public static int RUNNING = 2;
+  final public static int READY = 3;
+  final public static int FAILED = 4;
+  final public static int DEPENDENT_FAILED = 5;
+	
+	
+  private JobConf theJobConf;
+  private int state;
+  private String jobID; 		// assigned and used by JobControl class
+  private String mapredJobID; // the job ID assigned by map/reduce
+  private String jobName;		// external name, assigned/used by client app
+  private String message;		// some info for human consumption, 
+  // e.g. the reason why the job failed
+  private ArrayList dependingJobs;	// the jobs the current job depends on
+	
+  private JobClient jc = null;		// the map reduce job client
+	
+  /** 
+   * Construct a job.
+   * @param jobConf a mapred job configuration representing a job to be executed.
+   * @param dependingJobs an array of jobs the current job depends on
+   */
+  public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException {
+    this.theJobConf = jobConf;
+    this.dependingJobs = dependingJobs;
+    this.state = Job.WAITING;
+    this.jobID = "unassigned";
+    this.mapredJobID = "unassigned";
+    this.jobName = "unassigned";
+    this.message = "just initialized";
+    this.jc = new JobClient(jobConf);
+  }
+	
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("job name:\t").append(this.jobName).append("\n");
+    sb.append("job id:\t").append(this.jobID).append("\n");
+    sb.append("job state:\t").append(this.state).append("\n");
+    sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
+    sb.append("job message:\t").append(this.message).append("\n");
 		
-		if (this.dependingJobs == null) {
-			sb.append("job has no depending job:\t").append("\n");
-		} else {
-			sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
-			for (int i = 0; i < this.dependingJobs.size(); i++) {
-				sb.append("\t depending job ").append(i).append(":\t");
-				sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n");
-			}
-		}
-		return sb.toString();
-	}
-	
-	/**
-	 * @return the job name of this job
-	 */
-	public String getJobName() {
-		return this.jobName;
-	}
-	
-	/**
-	 * Set the job name for  this job.
-	 * @param jobName the job name
-	 */
-	public void setJobName(String jobName) {
-		this.jobName = jobName;
-	}
-	
-	/**
-	 * @return the job ID of this job
-	 */
-	public String getJobID() {
-		return this.jobID;
-	}
-	
-	/**
-	 * Set the job ID for  this job.
-	 * @param id the job ID
-	 */
-	public void setJobID(String id) {
-		this.jobID = id;
-	}
-	
-	/**
-	 * @return the mapred ID of this job
-	 */
-	public String getMapredJobID() {
-		return this.mapredJobID;
-	}
-	
-	/**
-	 * Set the mapred ID for this job.
-	 * @param mapredJobID the mapred job ID for this job.
-	 */
-	public void setMapredJobID(String mapredJobID) {
-		this.jobID = mapredJobID;
-	}
-	
-	/**
-	 * @return the mapred job conf of this job
-	 */
-	public JobConf getJobConf() {
-		return this.theJobConf;
-	}
+    if (this.dependingJobs == null) {
+      sb.append("job has no depending job:\t").append("\n");
+    } else {
+      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
+      for (int i = 0; i < this.dependingJobs.size(); i++) {
+        sb.append("\t depending job ").append(i).append(":\t");
+        sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+	
+  /**
+   * @return the job name of this job
+   */
+  public String getJobName() {
+    return this.jobName;
+  }
+	
+  /**
+   * Set the job name for  this job.
+   * @param jobName the job name
+   */
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+	
+  /**
+   * @return the job ID of this job
+   */
+  public String getJobID() {
+    return this.jobID;
+  }
+	
+  /**
+   * Set the job ID for  this job.
+   * @param id the job ID
+   */
+  public void setJobID(String id) {
+    this.jobID = id;
+  }
+	
+  /**
+   * @return the mapred ID of this job
+   */
+  public String getMapredJobID() {
+    return this.mapredJobID;
+  }
+	
+  /**
+   * Set the mapred ID for this job.
+   * @param mapredJobID the mapred job ID for this job.
+   */
+  public void setMapredJobID(String mapredJobID) {
+    this.jobID = mapredJobID;
+  }
+	
+  /**
+   * @return the mapred job conf of this job
+   */
+  public JobConf getJobConf() {
+    return this.theJobConf;
+  }
 	
 
-	/**
-	 * Set the mapred job conf for this job.
-	 * @param jobConf the mapred job conf for this job.
-	 */
-	public void setJobConf(JobConf jobConf) {
-		this.theJobConf = jobConf;
-	}
-	
-	/**
-	 * @return the state of this job
-	 */
-	public int getState() {
-		return this.state;
-	}
-	
-	/**
-	 * Set the state for this job.
-	 * @param state the new state for this job.
-	 */
-	public void setState(int state) {
-		this.state = state;
-	}
-	
-	/**
-	 * @return the message of this job
-	 */
-	public String getMessage() {
-		return this.message;
-	}
-	
-	/**
-	 * Set the message for this job.
-	 * @param message the message for this job.
-	 */
-	public void setMessage(String message) {
-		this.message = message;
-	}
-	
-	/**
-	 * @return the depending jobs of this job
-	 */
-	public ArrayList getDependingJobs() {
-		return this.dependingJobs;
-	}
-	
-	/**
-	 * @return true if this job is in a complete state
-	 */
-	public boolean isCompleted() {
-		return this.state == Job.FAILED || 
-		       this.state == Job.DEPENDENT_FAILED ||
-		       this.state == Job.SUCCESS;
-	}
-	
-	/**
-	 * @return true if this job is in READY state
-	 */
-	public boolean isReady() {
-		return this.state == Job.READY;
-	}
-	
-	/**
-	 * Check the state of this running job. The state may 
-	 * remain the same, become SUCCESS or FAILED.
-	 */
-	private void checkRunningState() {
-		RunningJob running = null;
-		try {
-			running = jc.getJob(this.mapredJobID);
-			if (running.isComplete()) {
-				if (running.isSuccessful()) {
-					this.state = Job.SUCCESS;
-				} else {
-					this.state = Job.FAILED;
-					this.message = "Job failed!";
-					try {
-						running.killJob();
-					} catch (IOException e1) {
+  /**
+   * Set the mapred job conf for this job.
+   * @param jobConf the mapred job conf for this job.
+   */
+  public void setJobConf(JobConf jobConf) {
+    this.theJobConf = jobConf;
+  }
+	
+  /**
+   * @return the state of this job
+   */
+  public int getState() {
+    return this.state;
+  }
+	
+  /**
+   * Set the state for this job.
+   * @param state the new state for this job.
+   */
+  public void setState(int state) {
+    this.state = state;
+  }
+	
+  /**
+   * @return the message of this job
+   */
+  public String getMessage() {
+    return this.message;
+  }
+	
+  /**
+   * Set the message for this job.
+   * @param message the message for this job.
+   */
+  public void setMessage(String message) {
+    this.message = message;
+  }
+	
+  /**
+   * @return the depending jobs of this job
+   */
+  public ArrayList getDependingJobs() {
+    return this.dependingJobs;
+  }
+	
+  /**
+   * @return true if this job is in a complete state
+   */
+  public boolean isCompleted() {
+    return this.state == Job.FAILED || 
+      this.state == Job.DEPENDENT_FAILED ||
+      this.state == Job.SUCCESS;
+  }
+	
+  /**
+   * @return true if this job is in READY state
+   */
+  public boolean isReady() {
+    return this.state == Job.READY;
+  }
+	
+  /**
+   * Check the state of this running job. The state may 
+   * remain the same, become SUCCESS or FAILED.
+   */
+  private void checkRunningState() {
+    RunningJob running = null;
+    try {
+      running = jc.getJob(this.mapredJobID);
+      if (running.isComplete()) {
+        if (running.isSuccessful()) {
+          this.state = Job.SUCCESS;
+        } else {
+          this.state = Job.FAILED;
+          this.message = "Job failed!";
+          try {
+            running.killJob();
+          } catch (IOException e1) {
 
-					}
-					try {
-						this.jc.close();
-					} catch (IOException e2) {
+          }
+          try {
+            this.jc.close();
+          } catch (IOException e2) {
 
-					}
-				}
-			}
+          }
+        }
+      }
 
-		} catch (IOException ioe) {
-			this.state = Job.FAILED;
-			this.message = StringUtils.stringifyException(ioe);
-			try {
-				if(running != null)
-					running.killJob();
-			} catch (IOException e1) {
+    } catch (IOException ioe) {
+      this.state = Job.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+      try {
+        if(running != null)
+          running.killJob();
+      } catch (IOException e1) {
 
-			}
-			try {
-				this.jc.close();
-			} catch (IOException e1) {
+      }
+      try {
+        this.jc.close();
+      } catch (IOException e1) {
 
-			}
-		}
-	}
-	
-	/**
-	 * Check and update the state of this job. The state changes  
-	 * depending on its current state and the states of the depending jobs.
-	 */
-	public int checkState() {
-		if (this.state == Job.RUNNING) {
-			checkRunningState();
-		}
-		if (this.state != Job.WAITING) {
-			return this.state;
-		}
-		if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-			this.state = Job.READY;
-			return this.state;
-		}
-		Job pred = null;
-		int n = this.dependingJobs.size();
-		for (int i = 0; i < n; i++) {
-			pred = (Job) this.dependingJobs.get(i);
-			int s = pred.checkState();
-			if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
-				break; // a pred is still not completed, continue in WAITING
-						// state
-			}
-			if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
-				this.state = Job.DEPENDENT_FAILED;
-				this.message = "depending job " + i + " with jobID "
-						+ pred.getJobID() + " failed. " + pred.getMessage();
-				break;
-			}
-			// pred must be in success state
-			if (i == n - 1) {
-				this.state = Job.READY;
-			}
-		}
+      }
+    }
+  }
+	
+  /**
+   * Check and update the state of this job. The state changes  
+   * depending on its current state and the states of the depending jobs.
+   */
+  public int checkState() {
+    if (this.state == Job.RUNNING) {
+      checkRunningState();
+    }
+    if (this.state != Job.WAITING) {
+      return this.state;
+    }
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      this.state = Job.READY;
+      return this.state;
+    }
+    Job pred = null;
+    int n = this.dependingJobs.size();
+    for (int i = 0; i < n; i++) {
+      pred = (Job) this.dependingJobs.get(i);
+      int s = pred.checkState();
+      if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
+        break; // a pred is still not completed, continue in WAITING
+        // state
+      }
+      if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
+        this.state = Job.DEPENDENT_FAILED;
+        this.message = "depending job " + i + " with jobID "
+          + pred.getJobID() + " failed. " + pred.getMessage();
+        break;
+      }
+      // pred must be in success state
+      if (i == n - 1) {
+        this.state = Job.READY;
+      }
+    }
 
-		return this.state;
-	}
+    return this.state;
+  }
 	
-    /**
-     * Submit this job to mapred. The state becomes RUNNING if submission 
-     * is successful, FAILED otherwise.  
-     */
-    public void submit() {
-        try {
-            if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
-                FileSystem fs = FileSystem.get(theJobConf);
-                Path inputPaths[] = theJobConf.getInputPaths();
-                for (int i = 0; i < inputPaths.length; i++) {
-                    if (!fs.exists(inputPaths[i])) {
-                        try {
-                            fs.mkdirs(inputPaths[i]);
-                        } catch (IOException e) {
+  /**
+   * Submit this job to mapred. The state becomes RUNNING if submission 
+   * is successful, FAILED otherwise.  
+   */
+  public void submit() {
+    try {
+      if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
+        FileSystem fs = FileSystem.get(theJobConf);
+        Path inputPaths[] = theJobConf.getInputPaths();
+        for (int i = 0; i < inputPaths.length; i++) {
+          if (!fs.exists(inputPaths[i])) {
+            try {
+              fs.mkdirs(inputPaths[i]);
+            } catch (IOException e) {
 
-                        }
-                    }
-                }
             }
-            RunningJob running = jc.submitJob(theJobConf);
-            this.mapredJobID = running.getJobID();
-            this.state = Job.RUNNING;
-        } catch (IOException ioe) {
-            this.state = Job.FAILED;
-            this.message = StringUtils.stringifyException(ioe);
+          }
         }
+      }
+      RunningJob running = jc.submitJob(theJobConf);
+      this.mapredJobID = running.getJobID();
+      this.state = Job.RUNNING;
+    } catch (IOException ioe) {
+      this.state = Job.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
     }
+  }
 	
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    // TODO Auto-generated method stub
 
-	}
+  }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Mon Apr 16 14:44:35 2007
@@ -39,270 +39,270 @@
  */
 public class JobControl implements Runnable{
 
-	// The thread can be in one of the following state
-	private static final int RUNNING = 0;
-	private static final int SUSPENDED = 1;
-	private static final int STOPPED = 2;
-	private static final int STOPPING = 3;
-	private static final int READY = 4;
-	
-	private int runnerState;			// the thread state
-	
-	private Hashtable waitingJobs;
-	private Hashtable readyJobs;
-	private Hashtable runningJobs;
-	private Hashtable successfulJobs;
-	private Hashtable failedJobs;
-	
-	private long nextJobID;
-	private String groupName;
-	
-	/** 
-	 * Construct a job control for a group of jobs.
-	 * @param groupName a name identifying this group
-	 */
-	public JobControl(String groupName) {
-		this.waitingJobs = new Hashtable();
-		this.readyJobs = new Hashtable();
-		this.runningJobs = new Hashtable();
-		this.successfulJobs = new Hashtable();
-		this.failedJobs = new Hashtable();
-		this.nextJobID = -1;
-		this.groupName = groupName;
-		this.runnerState = JobControl.READY;
+  // The thread can be in one of the following state
+  private static final int RUNNING = 0;
+  private static final int SUSPENDED = 1;
+  private static final int STOPPED = 2;
+  private static final int STOPPING = 3;
+  private static final int READY = 4;
+	
+  private int runnerState;			// the thread state
+	
+  private Hashtable waitingJobs;
+  private Hashtable readyJobs;
+  private Hashtable runningJobs;
+  private Hashtable successfulJobs;
+  private Hashtable failedJobs;
+	
+  private long nextJobID;
+  private String groupName;
+	
+  /** 
+   * Construct a job control for a group of jobs.
+   * @param groupName a name identifying this group
+   */
+  public JobControl(String groupName) {
+    this.waitingJobs = new Hashtable();
+    this.readyJobs = new Hashtable();
+    this.runningJobs = new Hashtable();
+    this.successfulJobs = new Hashtable();
+    this.failedJobs = new Hashtable();
+    this.nextJobID = -1;
+    this.groupName = groupName;
+    this.runnerState = JobControl.READY;
 		
-	}
+  }
 	
-	private static ArrayList toArrayList(Hashtable jobs) {
-		ArrayList retv = new ArrayList();
-		synchronized (jobs) {
-			Iterator iter = jobs.values().iterator();
-			while (iter.hasNext()) {
-				retv.add(iter.next());
-			}
-		}
+  private static ArrayList toArrayList(Hashtable jobs) {
+    ArrayList retv = new ArrayList();
+    synchronized (jobs) {
+      Iterator iter = jobs.values().iterator();
+      while (iter.hasNext()) {
+        retv.add(iter.next());
+      }
+    }
 		
-		return retv;
-	}
+    return retv;
+  }
 	
-	/**
-	 * @return the jobs in the waiting state
-	 */
-	public ArrayList getWaitingJobs() {
-		return JobControl.toArrayList(this.waitingJobs);
-	}
-	
-	/**
-	 * @return the jobs in the running state
-	 */
-	public ArrayList getRunningJobs() {
-		return JobControl.toArrayList(this.runningJobs);
-	}
-	
-	/**
-	 * @return the jobs in the ready state
-	 */
-	public ArrayList getReadyJobs() {
-		return JobControl.toArrayList(this.readyJobs);
-	}
-	
-	/**
-	 * @return the jobs in the success state
-	 */
-	public ArrayList getSuccessfulJobs() {
-		return JobControl.toArrayList(this.successfulJobs);
-	}
-	
-	public ArrayList getFailedJobs() {
-		return JobControl.toArrayList(this.failedJobs);
-	}
-	
-	private String getNextJobID() {
-		nextJobID += 1;
-		return this.groupName + this.nextJobID;
-	}
-	
-	private static void addToQueue(Job aJob, Hashtable queue) {
-		synchronized(queue) {
-			queue.put(aJob.getJobID(), aJob);
-		}		
-	}
-	
-	private void addToQueue(Job aJob) {
-		Hashtable queue = getQueue(aJob.getState());
-		addToQueue(aJob, queue);	
-	}
-	
-	private Hashtable getQueue(int state) {
-		Hashtable retv = null;
-		if (state == Job.WAITING) {
-			retv = this.waitingJobs;
-		} else if (state == Job.READY) {
-			retv = this.readyJobs;
-		} else if (state == Job.RUNNING) {
-			retv = this.runningJobs;
-		} else if (state == Job.SUCCESS) {
-			retv = this.successfulJobs;
-		} else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
-			retv = this.failedJobs;
-		} 
-		return retv;
+  /**
+   * @return the jobs in the waiting state
+   */
+  public ArrayList getWaitingJobs() {
+    return JobControl.toArrayList(this.waitingJobs);
+  }
+	
+  /**
+   * @return the jobs in the running state
+   */
+  public ArrayList getRunningJobs() {
+    return JobControl.toArrayList(this.runningJobs);
+  }
+	
+  /**
+   * @return the jobs in the ready state
+   */
+  public ArrayList getReadyJobs() {
+    return JobControl.toArrayList(this.readyJobs);
+  }
+	
+  /**
+   * @return the jobs in the success state
+   */
+  public ArrayList getSuccessfulJobs() {
+    return JobControl.toArrayList(this.successfulJobs);
+  }
+	
+  public ArrayList getFailedJobs() {
+    return JobControl.toArrayList(this.failedJobs);
+  }
+	
+  private String getNextJobID() {
+    nextJobID += 1;
+    return this.groupName + this.nextJobID;
+  }
+	
+  private static void addToQueue(Job aJob, Hashtable queue) {
+    synchronized(queue) {
+      queue.put(aJob.getJobID(), aJob);
+    }		
+  }
+	
+  private void addToQueue(Job aJob) {
+    Hashtable queue = getQueue(aJob.getState());
+    addToQueue(aJob, queue);	
+  }
+	
+  private Hashtable getQueue(int state) {
+    Hashtable retv = null;
+    if (state == Job.WAITING) {
+      retv = this.waitingJobs;
+    } else if (state == Job.READY) {
+      retv = this.readyJobs;
+    } else if (state == Job.RUNNING) {
+      retv = this.runningJobs;
+    } else if (state == Job.SUCCESS) {
+      retv = this.successfulJobs;
+    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
+      retv = this.failedJobs;
+    } 
+    return retv;
 			
-	}
+  }
 
-	/**
-	 * Add a new job.
-	 * @param aJob the the new job
-	 */
-	synchronized public String addJob(Job aJob) {
-		String id = this.getNextJobID();
-		aJob.setJobID(id);
-		aJob.setState(Job.WAITING);
-		this.addToQueue(aJob);
-		return id;	
-	}
-	
-	/**
-	 * Add a collection of jobs
-	 * 
-	 * @param jobs
-	 */
-	public void addJobs(Collection<Job> jobs) {
-		for (Job job : jobs) {
-			addJob(job);
-		}
-	}
-	
-	/**
-	 * @return the thread state
-	 */
-	public int getState() {
-		return this.runnerState;
-	}
-	
-	/**
-	 * set the thread state to STOPPING so that the 
-	 * thread will stop when it wakes up.
-	 */
-	public void stop() {
-		this.runnerState = JobControl.STOPPING;
-	}
-	
-	/**
-	 * suspend the running thread
-	 */
-	public void suspend () {
-		if (this.runnerState == JobControl.RUNNING) {
-			this.runnerState = JobControl.SUSPENDED;
-		}
-	}
-	
-	/**
-	 * resume the suspended thread
-	 */
-	public void resume () {
-		if (this.runnerState == JobControl.SUSPENDED) {
-			this.runnerState = JobControl.RUNNING;
-		}
-	}
+  /**
+   * Add a new job.
+   * @param aJob the the new job
+   */
+  synchronized public String addJob(Job aJob) {
+    String id = this.getNextJobID();
+    aJob.setJobID(id);
+    aJob.setState(Job.WAITING);
+    this.addToQueue(aJob);
+    return id;	
+  }
+	
+  /**
+   * Add a collection of jobs
+   * 
+   * @param jobs
+   */
+  public void addJobs(Collection<Job> jobs) {
+    for (Job job : jobs) {
+      addJob(job);
+    }
+  }
+	
+  /**
+   * @return the thread state
+   */
+  public int getState() {
+    return this.runnerState;
+  }
+	
+  /**
+   * set the thread state to STOPPING so that the 
+   * thread will stop when it wakes up.
+   */
+  public void stop() {
+    this.runnerState = JobControl.STOPPING;
+  }
+	
+  /**
+   * suspend the running thread
+   */
+  public void suspend () {
+    if (this.runnerState == JobControl.RUNNING) {
+      this.runnerState = JobControl.SUSPENDED;
+    }
+  }
+	
+  /**
+   * resume the suspended thread
+   */
+  public void resume () {
+    if (this.runnerState == JobControl.SUSPENDED) {
+      this.runnerState = JobControl.RUNNING;
+    }
+  }
 	
-	synchronized private void checkRunningJobs() {
+  synchronized private void checkRunningJobs() {
 		
-		Hashtable oldJobs = null;
-		oldJobs = this.runningJobs;
-		this.runningJobs = new Hashtable();
+    Hashtable oldJobs = null;
+    oldJobs = this.runningJobs;
+    this.runningJobs = new Hashtable();
 		
-		Iterator jobs = oldJobs.values().iterator();
-		while (jobs.hasNext()) {
-			Job nextJob = (Job)jobs.next();
-			int state = nextJob.checkState();
-			/*
-			if (state != Job.RUNNING) {
-				System.out.println("The state of the running job " +
-					nextJob.getJobName() + " has changed to: " + nextJob.getState());
-			}
-			*/
-			this.addToQueue(nextJob);
-		}
-	}
-	
-	synchronized private void checkWaitingJobs() {
-		Hashtable oldJobs = null;
-		oldJobs = this.waitingJobs;
-		this.waitingJobs = new Hashtable();
+    Iterator jobs = oldJobs.values().iterator();
+    while (jobs.hasNext()) {
+      Job nextJob = (Job)jobs.next();
+      int state = nextJob.checkState();
+      /*
+        if (state != Job.RUNNING) {
+        System.out.println("The state of the running job " +
+        nextJob.getJobName() + " has changed to: " + nextJob.getState());
+        }
+      */
+      this.addToQueue(nextJob);
+    }
+  }
+	
+  synchronized private void checkWaitingJobs() {
+    Hashtable oldJobs = null;
+    oldJobs = this.waitingJobs;
+    this.waitingJobs = new Hashtable();
 		
-		Iterator jobs = oldJobs.values().iterator();
-		while (jobs.hasNext()) {
-			Job nextJob = (Job)jobs.next();
-			int state = nextJob.checkState();
-			/*
-			if (state != Job.WAITING) {
-				System.out.println("The state of the waiting job " +
-					nextJob.getJobName() + " has changed to: " + nextJob.getState());
-			}
-			*/
-			this.addToQueue(nextJob);
-		}
-	}
-	
-	synchronized private void startReadyJobs() {
-		Hashtable oldJobs = null;
-		oldJobs = this.readyJobs;
-		this.readyJobs = new Hashtable();
+    Iterator jobs = oldJobs.values().iterator();
+    while (jobs.hasNext()) {
+      Job nextJob = (Job)jobs.next();
+      int state = nextJob.checkState();
+      /*
+        if (state != Job.WAITING) {
+        System.out.println("The state of the waiting job " +
+        nextJob.getJobName() + " has changed to: " + nextJob.getState());
+        }
+      */
+      this.addToQueue(nextJob);
+    }
+  }
+	
+  synchronized private void startReadyJobs() {
+    Hashtable oldJobs = null;
+    oldJobs = this.readyJobs;
+    this.readyJobs = new Hashtable();
 		
-		Iterator jobs = oldJobs.values().iterator();
-		while (jobs.hasNext()) {
-			Job nextJob = (Job)jobs.next();
-			//System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
-			nextJob.submit();
-			//System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
-			this.addToQueue(nextJob);
-		}	
-	}
-	
-	synchronized public boolean allFinished() {
-		return this.waitingJobs.size() == 0 &&
-			this.readyJobs.size() == 0 &&
-			this.runningJobs.size() == 0;
-	}
-	
-	/**
-	 *  The main loop for the thread.
-	 *  The loop does the following:
-	 *  	Check the states of the running jobs
-	 *  	Update the states of waiting jobs
-	 *  	Submit the jobs in ready state
-	 */
-	public void run() {
-		this.runnerState = JobControl.RUNNING;
-		while (true) {
-			while (this.runnerState == JobControl.SUSPENDED) {
-				try {
-					Thread.sleep(5000);
-				}
-				catch (Exception e) {
+    Iterator jobs = oldJobs.values().iterator();
+    while (jobs.hasNext()) {
+      Job nextJob = (Job)jobs.next();
+      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
+      nextJob.submit();
+      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
+      this.addToQueue(nextJob);
+    }	
+  }
+	
+  synchronized public boolean allFinished() {
+    return this.waitingJobs.size() == 0 &&
+      this.readyJobs.size() == 0 &&
+      this.runningJobs.size() == 0;
+  }
+	
+  /**
+   *  The main loop for the thread.
+   *  The loop does the following:
+   *  	Check the states of the running jobs
+   *  	Update the states of waiting jobs
+   *  	Submit the jobs in ready state
+   */
+  public void run() {
+    this.runnerState = JobControl.RUNNING;
+    while (true) {
+      while (this.runnerState == JobControl.SUSPENDED) {
+        try {
+          Thread.sleep(5000);
+        }
+        catch (Exception e) {
 					
-				}
-			}
-			checkRunningJobs();	
-			checkWaitingJobs();		
-			startReadyJobs();		
-			if (this.runnerState != JobControl.RUNNING && 
-					this.runnerState != JobControl.SUSPENDED) {
-				break;
-			}
-			try {
-				Thread.sleep(5000);
-			}
-			catch (Exception e) {
+        }
+      }
+      checkRunningJobs();	
+      checkWaitingJobs();		
+      startReadyJobs();		
+      if (this.runnerState != JobControl.RUNNING && 
+          this.runnerState != JobControl.SUSPENDED) {
+        break;
+      }
+      try {
+        Thread.sleep(5000);
+      }
+      catch (Exception e) {
 				
-			}
-			if (this.runnerState != JobControl.RUNNING && 
-					this.runnerState != JobControl.SUSPENDED) {
-				break;
-			}
-		}
-		this.runnerState = JobControl.STOPPED;
-	}
+      }
+      if (this.runnerState != JobControl.RUNNING && 
+          this.runnerState != JobControl.SUSPENDED) {
+        break;
+      }
+    }
+    this.runnerState = JobControl.STOPPED;
+  }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java Mon Apr 16 14:44:35 2007
@@ -34,9 +34,9 @@
   public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
                                       String name, Progressable progress) {
     return new RecordWriter(){
-      public void write(WritableComparable key, Writable value) { }
-      public void close(Reporter reporter) { }
-    };
+        public void write(WritableComparable key, Writable value) { }
+        public void close(Reporter reporter) { }
+      };
   }
   
   public void checkOutputSpecs(FileSystem ignored, JobConf job) { }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java Mon Apr 16 14:44:35 2007
@@ -35,153 +35,153 @@
  */
 public class ContextFactory {
     
-    private static final String PROPERTIES_FILE = 
-            "/hadoop-metrics.properties";
-    private static final String CONTEXT_CLASS_SUFFIX =
-            ".class";
-    private static final String DEFAULT_CONTEXT_CLASSNAME =
-            "org.apache.hadoop.metrics.spi.NullContext";
-    
-    private static ContextFactory theFactory = null;
-    
-    private Map<String,Object> attributeMap = new HashMap<String,Object>();
-    private Map<String,AbstractMetricsContext> contextMap = 
-            new HashMap<String,AbstractMetricsContext>();
-    
-    // Used only when contexts, or the ContextFactory itself, cannot be
-    // created.
-    private static Map<String,MetricsContext> nullContextMap = 
-            new HashMap<String,MetricsContext>();
-    
-    /** Creates a new instance of ContextFactory */
-    protected ContextFactory() {
-    }
-    
-    /**
-     * Returns the value of the named attribute, or null if there is no 
-     * attribute of that name.
-     *
-     * @param attributeName the attribute name
-     * @return the attribute value
-     */
-    public Object getAttribute(String attributeName) {
-        return attributeMap.get(attributeName);
-    }
-    
-    /**
-     * Returns the names of all the factory's attributes.
-     * 
-     * @return the attribute names
-     */
-    public String[] getAttributeNames() {
-        String[] result = new String[attributeMap.size()];
-        int i = 0;
-        // for (String attributeName : attributeMap.keySet()) {
-        Iterator it = attributeMap.keySet().iterator();
-        while (it.hasNext()) {
-            result[i++] = (String) it.next();
-        }
-        return result;
-    }
-    
-    /**
-     * Sets the named factory attribute to the specified value, creating it
-     * if it did not already exist.  If the value is null, this is the same as
-     * calling removeAttribute.
-     *
-     * @param attributeName the attribute name
-     * @param value the new attribute value
-     */
-    public void setAttribute(String attributeName, Object value) {
-        attributeMap.put(attributeName, value);
-    }
+  private static final String PROPERTIES_FILE = 
+    "/hadoop-metrics.properties";
+  private static final String CONTEXT_CLASS_SUFFIX =
+    ".class";
+  private static final String DEFAULT_CONTEXT_CLASSNAME =
+    "org.apache.hadoop.metrics.spi.NullContext";
+    
+  private static ContextFactory theFactory = null;
+    
+  private Map<String,Object> attributeMap = new HashMap<String,Object>();
+  private Map<String,AbstractMetricsContext> contextMap = 
+    new HashMap<String,AbstractMetricsContext>();
+    
+  // Used only when contexts, or the ContextFactory itself, cannot be
+  // created.
+  private static Map<String,MetricsContext> nullContextMap = 
+    new HashMap<String,MetricsContext>();
+    
+  /** Creates a new instance of ContextFactory */
+  protected ContextFactory() {
+  }
+    
+  /**
+   * Returns the value of the named attribute, or null if there is no 
+   * attribute of that name.
+   *
+   * @param attributeName the attribute name
+   * @return the attribute value
+   */
+  public Object getAttribute(String attributeName) {
+    return attributeMap.get(attributeName);
+  }
+    
+  /**
+   * Returns the names of all the factory's attributes.
+   * 
+   * @return the attribute names
+   */
+  public String[] getAttributeNames() {
+    String[] result = new String[attributeMap.size()];
+    int i = 0;
+    // for (String attributeName : attributeMap.keySet()) {
+    Iterator it = attributeMap.keySet().iterator();
+    while (it.hasNext()) {
+      result[i++] = (String) it.next();
+    }
+    return result;
+  }
+    
+  /**
+   * Sets the named factory attribute to the specified value, creating it
+   * if it did not already exist.  If the value is null, this is the same as
+   * calling removeAttribute.
+   *
+   * @param attributeName the attribute name
+   * @param value the new attribute value
+   */
+  public void setAttribute(String attributeName, Object value) {
+    attributeMap.put(attributeName, value);
+  }
 
-    /**
-     * Removes the named attribute if it exists.
-     *
-     * @param attributeName the attribute name
-     */
-    public void removeAttribute(String attributeName) {
-        attributeMap.remove(attributeName);
-    }
-    
-    /**
-     * Returns the named MetricsContext instance, constructing it if necessary 
-     * using the factory's current configuration attributes. <p/>
-     * 
-     * When constructing the instance, if the factory property 
-     * <i>contextName</i>.class</code> exists, 
-     * its value is taken to be the name of the class to instantiate.  Otherwise,
-     * the default is to create an instance of 
-     * <code>org.apache.hadoop.metrics.spi.NullContext</code>, which is a 
-     * dummy "no-op" context which will cause all metric data to be discarded.
-     * 
-     * @param contextName the name of the context
-     * @return the named MetricsContext
-     */
-    public synchronized MetricsContext getContext(String contextName) 
-        throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
-    {
-        AbstractMetricsContext metricsContext = contextMap.get(contextName);
-        if (metricsContext == null) {
-            String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
-            String className = (String) getAttribute(classNameAttribute);
-            if (className == null) {
-                className = DEFAULT_CONTEXT_CLASSNAME;
-            }
-            Class contextClass = Class.forName(className);
-            metricsContext = (AbstractMetricsContext) contextClass.newInstance();
-            metricsContext.init(contextName, this);
-            contextMap.put(contextName, metricsContext);
-        }
-        return metricsContext;
-    }
-    
-    /**
-     * Returns a "null" context - one which does nothing.
-     */
-    public static synchronized MetricsContext getNullContext(String contextName) {
-        MetricsContext nullContext = nullContextMap.get(contextName);
-        if (nullContext == null) {
-            nullContext = new NullContext();
-            nullContextMap.put(contextName, nullContext);
-        }
-        return nullContext;
-    }
-    
-    /**
-     * Returns the singleton ContextFactory instance, constructing it if 
-     * necessary. <p/>
-     * 
-     * When the instance is constructed, this method checks if the file 
-     * <code>hadoop-metrics.properties</code> exists on the class path.  If it 
-     * exists, it must be in the format defined by java.util.Properties, and all 
-     * the properties in the file are set as attributes on the newly created
-     * ContextFactory instance.
-     *
-     * @return the singleton ContextFactory instance
-     */
-    public static synchronized ContextFactory getFactory() throws IOException {
-        if (theFactory == null) {
-            theFactory = new ContextFactory();
-            theFactory.setAttributes();
-        }
-        return theFactory;
-    }
-    
-    private void setAttributes() throws IOException {
-        InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
-        if (is != null) {
-            Properties properties = new Properties();
-            properties.load(is);
-            //for (Object propertyNameObj : properties.keySet()) {
-            Iterator it = properties.keySet().iterator();
-            while (it.hasNext()) {
-                String propertyName = (String) it.next();
-                String propertyValue = properties.getProperty(propertyName);
-                setAttribute(propertyName, propertyValue);
-            }
-        }
+  /**
+   * Removes the named attribute if it exists.
+   *
+   * @param attributeName the attribute name
+   */
+  public void removeAttribute(String attributeName) {
+    attributeMap.remove(attributeName);
+  }
+    
+  /**
+   * Returns the named MetricsContext instance, constructing it if necessary 
+   * using the factory's current configuration attributes. <p/>
+   * 
+   * When constructing the instance, if the factory property 
+   * <i>contextName</i>.class</code> exists, 
+   * its value is taken to be the name of the class to instantiate.  Otherwise,
+   * the default is to create an instance of 
+   * <code>org.apache.hadoop.metrics.spi.NullContext</code>, which is a 
+   * dummy "no-op" context which will cause all metric data to be discarded.
+   * 
+   * @param contextName the name of the context
+   * @return the named MetricsContext
+   */
+  public synchronized MetricsContext getContext(String contextName) 
+    throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
+  {
+    AbstractMetricsContext metricsContext = contextMap.get(contextName);
+    if (metricsContext == null) {
+      String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
+      String className = (String) getAttribute(classNameAttribute);
+      if (className == null) {
+        className = DEFAULT_CONTEXT_CLASSNAME;
+      }
+      Class contextClass = Class.forName(className);
+      metricsContext = (AbstractMetricsContext) contextClass.newInstance();
+      metricsContext.init(contextName, this);
+      contextMap.put(contextName, metricsContext);
+    }
+    return metricsContext;
+  }
+    
+  /**
+   * Returns a "null" context - one which does nothing.
+   */
+  public static synchronized MetricsContext getNullContext(String contextName) {
+    MetricsContext nullContext = nullContextMap.get(contextName);
+    if (nullContext == null) {
+      nullContext = new NullContext();
+      nullContextMap.put(contextName, nullContext);
+    }
+    return nullContext;
+  }
+    
+  /**
+   * Returns the singleton ContextFactory instance, constructing it if 
+   * necessary. <p/>
+   * 
+   * When the instance is constructed, this method checks if the file 
+   * <code>hadoop-metrics.properties</code> exists on the class path.  If it 
+   * exists, it must be in the format defined by java.util.Properties, and all 
+   * the properties in the file are set as attributes on the newly created
+   * ContextFactory instance.
+   *
+   * @return the singleton ContextFactory instance
+   */
+  public static synchronized ContextFactory getFactory() throws IOException {
+    if (theFactory == null) {
+      theFactory = new ContextFactory();
+      theFactory.setAttributes();
+    }
+    return theFactory;
+  }
+    
+  private void setAttributes() throws IOException {
+    InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
+    if (is != null) {
+      Properties properties = new Properties();
+      properties.load(is);
+      //for (Object propertyNameObj : properties.keySet()) {
+      Iterator it = properties.keySet().iterator();
+      while (it.hasNext()) {
+        String propertyName = (String) it.next();
+        String propertyValue = properties.getProperty(propertyName);
+        setAttribute(propertyName, propertyValue);
+      }
     }
+  }
     
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java Mon Apr 16 14:44:35 2007
@@ -27,69 +27,69 @@
  */
 public interface MetricsContext {
     
-    /**
-     * Default period in seconds at which data is sent to the metrics system.
-     */
-    public static final int DEFAULT_PERIOD = 5;
-    
-    /**
-     * Returns the context name.
-     *
-     * @return the context name
-     */
-    public abstract String getContextName();
-    
-    /**
-     * Starts or restarts monitoring, the emitting of metrics records as they are 
-     * updated. 
-     */
-    public abstract void startMonitoring()
-        throws IOException;
+  /**
+   * Default period in seconds at which data is sent to the metrics system.
+   */
+  public static final int DEFAULT_PERIOD = 5;
+    
+  /**
+   * Returns the context name.
+   *
+   * @return the context name
+   */
+  public abstract String getContextName();
+    
+  /**
+   * Starts or restarts monitoring, the emitting of metrics records as they are 
+   * updated. 
+   */
+  public abstract void startMonitoring()
+    throws IOException;
 
-    /**
-     * Stops monitoring.  This does not free any data that the implementation
-     * may have buffered for sending at the next timer event. It
-     * is OK to call <code>startMonitoring()</code> again after calling 
-     * this.
-     * @see #close()
-     */
-    public abstract void stopMonitoring();
-    
-    /**
-     * Returns true if monitoring is currently in progress.
-     */
-    public abstract boolean isMonitoring();
-    
-    /**
-     * Stops monitoring and also frees any buffered data, returning this 
-     * object to its initial state.  
-     */
-    public abstract void close();
-    
-    /**
-     * Creates a new MetricsRecord instance with the given <code>recordName</code>.
-     * Throws an exception if the metrics implementation is configured with a fixed
-     * set of record names and <code>recordName</code> is not in that set.
-     *
-     * @param recordName the name of the record
-     * @throws MetricsException if recordName conflicts with configuration data
-     */
-    public abstract MetricsRecord createRecord(String recordName);
-    
-    /**
-     * Registers a callback to be called at regular time intervals, as 
-     * determined by the implementation-class specific configuration.
-     *
-     * @param updater object to be run periodically; it should updated
-     * some metrics records and then return
-     */
-    public abstract void registerUpdater(Updater updater);
+  /**
+   * Stops monitoring.  This does not free any data that the implementation
+   * may have buffered for sending at the next timer event. It
+   * is OK to call <code>startMonitoring()</code> again after calling 
+   * this.
+   * @see #close()
+   */
+  public abstract void stopMonitoring();
+    
+  /**
+   * Returns true if monitoring is currently in progress.
+   */
+  public abstract boolean isMonitoring();
+    
+  /**
+   * Stops monitoring and also frees any buffered data, returning this 
+   * object to its initial state.  
+   */
+  public abstract void close();
+    
+  /**
+   * Creates a new MetricsRecord instance with the given <code>recordName</code>.
+   * Throws an exception if the metrics implementation is configured with a fixed
+   * set of record names and <code>recordName</code> is not in that set.
+   *
+   * @param recordName the name of the record
+   * @throws MetricsException if recordName conflicts with configuration data
+   */
+  public abstract MetricsRecord createRecord(String recordName);
+    
+  /**
+   * Registers a callback to be called at regular time intervals, as 
+   * determined by the implementation-class specific configuration.
+   *
+   * @param updater object to be run periodically; it should updated
+   * some metrics records and then return
+   */
+  public abstract void registerUpdater(Updater updater);
 
-    /**
-     * Removes a callback, if it exists.
-     * 
-     * @param updater object to be removed from the callback list
-     */
-    public abstract void unregisterUpdater(Updater updater);
+  /**
+   * Removes a callback, if it exists.
+   * 
+   * @param updater object to be removed from the callback list
+   */
+  public abstract void unregisterUpdater(Updater updater);
     
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java Mon Apr 16 14:44:35 2007
@@ -25,18 +25,18 @@
  */
 public class MetricsException extends RuntimeException {
     
-	private static final long serialVersionUID = -1643257498540498497L;
+  private static final long serialVersionUID = -1643257498540498497L;
 
-	/** Creates a new instance of MetricsException */
-    public MetricsException() {
-    }
+  /** Creates a new instance of MetricsException */
+  public MetricsException() {
+  }
     
-    /** Creates a new instance of MetricsException 
-     *
-     * @param message an error message
-     */
-    public MetricsException(String message) {
-        super(message);
-    }
+  /** Creates a new instance of MetricsException 
+   *
+   * @param message an error message
+   */
+  public MetricsException(String message) {
+    super(message);
+  }
     
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java Mon Apr 16 14:44:35 2007
@@ -67,140 +67,140 @@
  */
 public interface MetricsRecord {
     
-    /**
-     * Returns the record name. 
-     *
-     * @return the record name
-     */
-    public abstract String getRecordName();
-    
-    /**
-     * Sets the named tag to the specified value.
-     *
-     * @param tagName name of the tag
-     * @param tagValue new value of the tag
-     * @throws MetricsException if the tagName conflicts with the configuration
-     */
-    public abstract void setTag(String tagName, String tagValue);
-    
-    /**
-     * Sets the named tag to the specified value.
-     *
-     * @param tagName name of the tag
-     * @param tagValue new value of the tag
-     * @throws MetricsException if the tagName conflicts with the configuration
-     */
-    public abstract void setTag(String tagName, int tagValue);
-    
-    /**
-     * Sets the named tag to the specified value.
-     *
-     * @param tagName name of the tag
-     * @param tagValue new value of the tag
-     * @throws MetricsException if the tagName conflicts with the configuration
-     */
-    public abstract void setTag(String tagName, short tagValue);
-    
-    /**
-     * Sets the named tag to the specified value.
-     *
-     * @param tagName name of the tag
-     * @param tagValue new value of the tag
-     * @throws MetricsException if the tagName conflicts with the configuration
-     */
-    public abstract void setTag(String tagName, byte tagValue);
-    
-    /**
-     * Sets the named metric to the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue new value of the metric
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void setMetric(String metricName, int metricValue);
-    
-    /**
-     * Sets the named metric to the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue new value of the metric
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void setMetric(String metricName, short metricValue);
-    
-    /**
-     * Sets the named metric to the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue new value of the metric
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void setMetric(String metricName, byte metricValue);
-    
-    /**
-     * Sets the named metric to the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue new value of the metric
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void setMetric(String metricName, float metricValue);
-    
-    /**
-     * Increments the named metric by the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue incremental value
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void incrMetric(String metricName, int metricValue);
-    
-    /**
-     * Increments the named metric by the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue incremental value
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void incrMetric(String metricName, short metricValue);
-    
-    /**
-     * Increments the named metric by the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue incremental value
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void incrMetric(String metricName, byte metricValue);
-    
-    /**
-     * Increments the named metric by the specified value.
-     *
-     * @param metricName name of the metric
-     * @param metricValue incremental value
-     * @throws MetricsException if the metricName or the type of the metricValue 
-     * conflicts with the configuration
-     */
-    public abstract void incrMetric(String metricName, float metricValue);
-    
-    /**
-     * Updates the table of buffered data which is to be sent periodically.
-     * If the tag values match an existing row, that row is updated; 
-     * otherwise, a new row is added.
-     */
-    public abstract void update();
-    
-    /**
-     * Removes, from the buffered data table, the row (if it exists) having tags 
-     * that equal the tags that have been set on this record. 
-     */
-    public abstract void remove();
+  /**
+   * Returns the record name. 
+   *
+   * @return the record name
+   */
+  public abstract String getRecordName();
+    
+  /**
+   * Sets the named tag to the specified value.
+   *
+   * @param tagName name of the tag
+   * @param tagValue new value of the tag
+   * @throws MetricsException if the tagName conflicts with the configuration
+   */
+  public abstract void setTag(String tagName, String tagValue);
+    
+  /**
+   * Sets the named tag to the specified value.
+   *
+   * @param tagName name of the tag
+   * @param tagValue new value of the tag
+   * @throws MetricsException if the tagName conflicts with the configuration
+   */
+  public abstract void setTag(String tagName, int tagValue);
+    
+  /**
+   * Sets the named tag to the specified value.
+   *
+   * @param tagName name of the tag
+   * @param tagValue new value of the tag
+   * @throws MetricsException if the tagName conflicts with the configuration
+   */
+  public abstract void setTag(String tagName, short tagValue);
+    
+  /**
+   * Sets the named tag to the specified value.
+   *
+   * @param tagName name of the tag
+   * @param tagValue new value of the tag
+   * @throws MetricsException if the tagName conflicts with the configuration
+   */
+  public abstract void setTag(String tagName, byte tagValue);
+    
+  /**
+   * Sets the named metric to the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void setMetric(String metricName, int metricValue);
+    
+  /**
+   * Sets the named metric to the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void setMetric(String metricName, short metricValue);
+    
+  /**
+   * Sets the named metric to the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void setMetric(String metricName, byte metricValue);
+    
+  /**
+   * Sets the named metric to the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void setMetric(String metricName, float metricValue);
+    
+  /**
+   * Increments the named metric by the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue incremental value
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void incrMetric(String metricName, int metricValue);
+    
+  /**
+   * Increments the named metric by the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue incremental value
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void incrMetric(String metricName, short metricValue);
+    
+  /**
+   * Increments the named metric by the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue incremental value
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void incrMetric(String metricName, byte metricValue);
+    
+  /**
+   * Increments the named metric by the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue incremental value
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
+  public abstract void incrMetric(String metricName, float metricValue);
+    
+  /**
+   * Updates the table of buffered data which is to be sent periodically.
+   * If the tag values match an existing row, that row is updated; 
+   * otherwise, a new row is added.
+   */
+  public abstract void update();
+    
+  /**
+   * Removes, from the buffered data table, the row (if it exists) having tags 
+   * that equal the tags that have been set on this record. 
+   */
+  public abstract void remove();
     
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java Mon Apr 16 14:44:35 2007
@@ -25,9 +25,9 @@
  */
 public interface Updater {
     
-    /**
-     * Timer-based call-back from the metric library. 
-     */
-    public abstract void doUpdates(MetricsContext context);
+  /**
+   * Timer-based call-back from the metric library. 
+   */
+  public abstract void doUpdates(MetricsContext context);
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java Mon Apr 16 14:44:35 2007
@@ -45,108 +45,108 @@
  */
 public class FileContext extends AbstractMetricsContext {
     
-    /* Configuration attribute names */
-    protected static final String FILE_NAME_PROPERTY = "fileName";
-    protected static final String PERIOD_PROPERTY = "period";
+  /* Configuration attribute names */
+  protected static final String FILE_NAME_PROPERTY = "fileName";
+  protected static final String PERIOD_PROPERTY = "period";
     
-    private File file = null;              // file for metrics to be written to
-    private PrintWriter writer = null;
+  private File file = null;              // file for metrics to be written to
+  private PrintWriter writer = null;
     
-    /** Creates a new instance of FileContext */
-    public FileContext() {}
+  /** Creates a new instance of FileContext */
+  public FileContext() {}
     
-    public void init(String contextName, ContextFactory factory) {
-        super.init(contextName, factory);
+  public void init(String contextName, ContextFactory factory) {
+    super.init(contextName, factory);
         
-        String fileName = getAttribute(FILE_NAME_PROPERTY);
-        if (fileName != null) {
-            file = new File(fileName);
-        }
+    String fileName = getAttribute(FILE_NAME_PROPERTY);
+    if (fileName != null) {
+      file = new File(fileName);
+    }
         
-        String periodStr = getAttribute(PERIOD_PROPERTY);
-        if (periodStr != null) {
-            int period = 0;
-            try {
-                period = Integer.parseInt(periodStr);
-            } catch (NumberFormatException nfe) {
-            }
-            if (period <= 0) {
-                throw new MetricsException("Invalid period: " + periodStr);
-            }
-            setPeriod(period);
-        }
+    String periodStr = getAttribute(PERIOD_PROPERTY);
+    if (periodStr != null) {
+      int period = 0;
+      try {
+        period = Integer.parseInt(periodStr);
+      } catch (NumberFormatException nfe) {
+      }
+      if (period <= 0) {
+        throw new MetricsException("Invalid period: " + periodStr);
+      }
+      setPeriod(period);
     }
+  }
 
-    /**
-     * Returns the configured file name, or null.
-     */
-    public String getFileName() {
-        if (file == null) {
-            return null;
-        } else {
-            return file.getName();
-        }
-    }
-    
-    /**
-     * Starts or restarts monitoring, by opening in append-mode, the
-     * file specified by the <code>fileName</code> attribute,
-     * if specified. Otherwise the data will be written to standard
-     * output.
-     */
-    public void startMonitoring()
-        throws IOException 
-    {
-        if (file == null) {
-            writer = new PrintWriter(new BufferedOutputStream(System.out));
-        } else {
-            writer = new PrintWriter(new FileWriter(file, true));
-        }
-        super.startMonitoring();
-    }
-    
-    /**
-     * Stops monitoring, closing the file.
-     * @see #close()
-     */
-    public void stopMonitoring() {
-        super.stopMonitoring();
+  /**
+   * Returns the configured file name, or null.
+   */
+  public String getFileName() {
+    if (file == null) {
+      return null;
+    } else {
+      return file.getName();
+    }
+  }
+    
+  /**
+   * Starts or restarts monitoring, by opening in append-mode, the
+   * file specified by the <code>fileName</code> attribute,
+   * if specified. Otherwise the data will be written to standard
+   * output.
+   */
+  public void startMonitoring()
+    throws IOException 
+  {
+    if (file == null) {
+      writer = new PrintWriter(new BufferedOutputStream(System.out));
+    } else {
+      writer = new PrintWriter(new FileWriter(file, true));
+    }
+    super.startMonitoring();
+  }
+    
+  /**
+   * Stops monitoring, closing the file.
+   * @see #close()
+   */
+  public void stopMonitoring() {
+    super.stopMonitoring();
         
-        if (writer != null) {
-            writer.close();
-            writer = null;
-        }
-    }
-    
-    /**
-     * Emits a metrics record to a file.
-     */
-    public void emitRecord(String contextName, String recordName, OutputRecord outRec) {
-        writer.print(contextName);
-        writer.print(".");
-        writer.print(recordName);
-        String separator = ": ";
-        for (String tagName : outRec.getTagNames()) {
-            writer.print(separator);
-            separator = ", ";
-            writer.print(tagName);
-            writer.print("=");
-            writer.print(outRec.getTag(tagName));
-        }
-        for (String metricName : outRec.getMetricNames()) {
-            writer.print(separator);
-            separator = ", ";
-            writer.print(metricName);
-            writer.print("=");
-            writer.print(outRec.getMetric(metricName));
-        }
-        writer.println();
-    }
-    
-    /**
-     * Flushes the output writer, forcing updates to disk.
-     */
-    public void flush() {
-        writer.flush();
-    }
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+  }
+    
+  /**
+   * Emits a metrics record to a file.
+   */
+  public void emitRecord(String contextName, String recordName, OutputRecord outRec) {
+    writer.print(contextName);
+    writer.print(".");
+    writer.print(recordName);
+    String separator = ": ";
+    for (String tagName : outRec.getTagNames()) {
+      writer.print(separator);
+      separator = ", ";
+      writer.print(tagName);
+      writer.print("=");
+      writer.print(outRec.getTag(tagName));
+    }
+    for (String metricName : outRec.getMetricNames()) {
+      writer.print(separator);
+      separator = ", ";
+      writer.print(metricName);
+      writer.print("=");
+      writer.print(outRec.getMetric(metricName));
+    }
+    writer.println();
+  }
+    
+  /**
+   * Flushes the output writer, forcing updates to disk.
+   */
+  public void flush() {
+    writer.flush();
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java Mon Apr 16 14:44:35 2007
@@ -41,183 +41,183 @@
  */
 public class GangliaContext extends AbstractMetricsContext {
     
-    private static final String PERIOD_PROPERTY = "period";
-    private static final String SERVERS_PROPERTY = "servers";
-    private static final String UNITS_PROPERTY = "units";
-    private static final String SLOPE_PROPERTY = "slope";
-    private static final String TMAX_PROPERTY = "tmax";
-    private static final String DMAX_PROPERTY = "dmax";
-    
-    private static final String DEFAULT_UNITS = "";
-    private static final String DEFAULT_SLOPE = "both";
-    private static final int DEFAULT_TMAX = 60;
-    private static final int DEFAULT_DMAX = 0;
-    private static final int DEFAULT_PORT = 8649;
-    private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
-    
-    private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
-    
-    static {
-        typeTable.put(String.class, "string");
-        typeTable.put(Byte.class, "int8");
-        typeTable.put(Short.class, "int16");
-        typeTable.put(Integer.class, "int32");
-        typeTable.put(Float.class, "float");
-    }
-    
-    private byte[] buffer = new byte[BUFFER_SIZE];
-    private int offset;
-    
-    private List<? extends SocketAddress> metricsServers;
-    private Map<String,String> unitsTable;
-    private Map<String,String> slopeTable;
-    private Map<String,String> tmaxTable;
-    private Map<String,String> dmaxTable;
-    
-    private DatagramSocket datagramSocket;
-    
-    /** Creates a new instance of GangliaContext */
-    public GangliaContext() {
-    }
-    
-    public void init(String contextName, ContextFactory factory) 
-    {
-        super.init(contextName, factory);
-        
-        String periodStr = getAttribute(PERIOD_PROPERTY);
-        if (periodStr != null) {
-            int period = 0;
-            try {
-                period = Integer.parseInt(periodStr);
-            } catch (NumberFormatException nfe) {
-            }
-            if (period <= 0) {
-                throw new MetricsException("Invalid period: " + periodStr);
-            }
-            setPeriod(period);
-        }
-        
-        metricsServers = 
-                Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
-        
-        unitsTable = getAttributeTable(UNITS_PROPERTY);
-        slopeTable = getAttributeTable(SLOPE_PROPERTY);
-        tmaxTable  = getAttributeTable(TMAX_PROPERTY);
-        dmaxTable  = getAttributeTable(DMAX_PROPERTY);
-        
-        try {
-            datagramSocket = new DatagramSocket();
-        }
-        catch (SocketException se) {
-            se.printStackTrace();
-        }
-    }
-        
-    public void emitRecord(String contextName, String recordName, OutputRecord outRec) 
-        throws IOException
-    {
-        // emit each metric in turn
-        for (String metricName : outRec.getMetricNames()) {
-          Object metric = outRec.getMetric(metricName);
-          String type = (String) typeTable.get(metric.getClass());
-          emitMetric(metricName, type, metric.toString());
-        }
-        
-    }
-    
-    private void emitMetric(String name, String type,  String value) 
-        throws IOException
-    {
-        String units = getUnits(name);
-        int slope = getSlope(name);
-        int tmax = getTmax(name);
-        int dmax = getDmax(name);
-        
-        offset = 0;
-        xdr_int(0);             // metric_user_defined
-        xdr_string(type);
-        xdr_string(name);
-        xdr_string(value);
-        xdr_string(units);
-        xdr_int(slope);
-        xdr_int(tmax);
-        xdr_int(dmax);
-        
-        for (SocketAddress socketAddress : metricsServers) {
-          DatagramPacket packet = 
-              new DatagramPacket(buffer, offset, socketAddress);
-          datagramSocket.send(packet);
-        }
-    }
-    
-    private String getUnits(String metricName) {
-        String result = (String) unitsTable.get(metricName);
-        if (result == null) {
-            result = DEFAULT_UNITS;
-        }
-        return result;
-    }
-    
-    private int getSlope(String metricName) {
-        String slopeString = (String) slopeTable.get(metricName);
-        if (slopeString == null) {
-            slopeString = DEFAULT_SLOPE; 
-        }
-        return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
-    }
-    
-    private int getTmax(String metricName) {
-        String tmaxString = (String) tmaxTable.get(metricName);
-        if (tmaxString == null) {
-            return DEFAULT_TMAX;
-        }
-        else {
-            return Integer.parseInt(tmaxString);
-        }
-    }
-    
-    private int getDmax(String metricName) {
-        String dmaxString = (String) dmaxTable.get(metricName);
-        if (dmaxString == null) {
-            return DEFAULT_DMAX;
-        }
-        else {
-            return Integer.parseInt(dmaxString);
-        }
-    }
-    
-    /**
-     * Puts a string into the buffer by first writing the size of the string
-     * as an int, followed by the bytes of the string, padded if necessary to
-     * a multiple of 4.
-     */
-    private void xdr_string(String s) {
-	byte[] bytes = s.getBytes();
-        int len = bytes.length;
-        xdr_int(len);
-        System.arraycopy(bytes, 0, buffer, offset, len);
-        offset += len;
-        pad();
-    }
+  private static final String PERIOD_PROPERTY = "period";
+  private static final String SERVERS_PROPERTY = "servers";
+  private static final String UNITS_PROPERTY = "units";
+  private static final String SLOPE_PROPERTY = "slope";
+  private static final String TMAX_PROPERTY = "tmax";
+  private static final String DMAX_PROPERTY = "dmax";
+    
+  private static final String DEFAULT_UNITS = "";
+  private static final String DEFAULT_SLOPE = "both";
+  private static final int DEFAULT_TMAX = 60;
+  private static final int DEFAULT_DMAX = 0;
+  private static final int DEFAULT_PORT = 8649;
+  private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
+    
+  private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
+    
+  static {
+    typeTable.put(String.class, "string");
+    typeTable.put(Byte.class, "int8");
+    typeTable.put(Short.class, "int16");
+    typeTable.put(Integer.class, "int32");
+    typeTable.put(Float.class, "float");
+  }
+    
+  private byte[] buffer = new byte[BUFFER_SIZE];
+  private int offset;
+    
+  private List<? extends SocketAddress> metricsServers;
+  private Map<String,String> unitsTable;
+  private Map<String,String> slopeTable;
+  private Map<String,String> tmaxTable;
+  private Map<String,String> dmaxTable;
+    
+  private DatagramSocket datagramSocket;
+    
+  /** Creates a new instance of GangliaContext */
+  public GangliaContext() {
+  }
+    
+  public void init(String contextName, ContextFactory factory) 
+  {
+    super.init(contextName, factory);
+        
+    String periodStr = getAttribute(PERIOD_PROPERTY);
+    if (periodStr != null) {
+      int period = 0;
+      try {
+        period = Integer.parseInt(periodStr);
+      } catch (NumberFormatException nfe) {
+      }
+      if (period <= 0) {
+        throw new MetricsException("Invalid period: " + periodStr);
+      }
+      setPeriod(period);
+    }
+        
+    metricsServers = 
+      Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
+        
+    unitsTable = getAttributeTable(UNITS_PROPERTY);
+    slopeTable = getAttributeTable(SLOPE_PROPERTY);
+    tmaxTable  = getAttributeTable(TMAX_PROPERTY);
+    dmaxTable  = getAttributeTable(DMAX_PROPERTY);
+        
+    try {
+      datagramSocket = new DatagramSocket();
+    }
+    catch (SocketException se) {
+      se.printStackTrace();
+    }
+  }
+        
+  public void emitRecord(String contextName, String recordName, OutputRecord outRec) 
+    throws IOException
+  {
+    // emit each metric in turn
+    for (String metricName : outRec.getMetricNames()) {
+      Object metric = outRec.getMetric(metricName);
+      String type = (String) typeTable.get(metric.getClass());
+      emitMetric(metricName, type, metric.toString());
+    }
+        
+  }
+    
+  private void emitMetric(String name, String type,  String value) 
+    throws IOException
+  {
+    String units = getUnits(name);
+    int slope = getSlope(name);
+    int tmax = getTmax(name);
+    int dmax = getDmax(name);
+        
+    offset = 0;
+    xdr_int(0);             // metric_user_defined
+    xdr_string(type);
+    xdr_string(name);
+    xdr_string(value);
+    xdr_string(units);
+    xdr_int(slope);
+    xdr_int(tmax);
+    xdr_int(dmax);
+        
+    for (SocketAddress socketAddress : metricsServers) {
+      DatagramPacket packet = 
+        new DatagramPacket(buffer, offset, socketAddress);
+      datagramSocket.send(packet);
+    }
+  }
+    
+  private String getUnits(String metricName) {
+    String result = (String) unitsTable.get(metricName);
+    if (result == null) {
+      result = DEFAULT_UNITS;
+    }
+    return result;
+  }
+    
+  private int getSlope(String metricName) {
+    String slopeString = (String) slopeTable.get(metricName);
+    if (slopeString == null) {
+      slopeString = DEFAULT_SLOPE; 
+    }
+    return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
+  }
+    
+  private int getTmax(String metricName) {
+    String tmaxString = (String) tmaxTable.get(metricName);
+    if (tmaxString == null) {
+      return DEFAULT_TMAX;
+    }
+    else {
+      return Integer.parseInt(tmaxString);
+    }
+  }
+    
+  private int getDmax(String metricName) {
+    String dmaxString = (String) dmaxTable.get(metricName);
+    if (dmaxString == null) {
+      return DEFAULT_DMAX;
+    }
+    else {
+      return Integer.parseInt(dmaxString);
+    }
+  }
+    
+  /**
+   * Puts a string into the buffer by first writing the size of the string
+   * as an int, followed by the bytes of the string, padded if necessary to
+   * a multiple of 4.
+   */
+  private void xdr_string(String s) {
+    byte[] bytes = s.getBytes();
+    int len = bytes.length;
+    xdr_int(len);
+    System.arraycopy(bytes, 0, buffer, offset, len);
+    offset += len;
+    pad();
+  }
 
-    /**
-     * Pads the buffer with zero bytes up to the nearest multiple of 4.
-     */
-    private void pad() {
-	int newOffset = ((offset + 3) / 4) * 4;
-	while (offset < newOffset) {
-            buffer[offset++] = 0;
-        }
-    }
-        
-    /**
-     * Puts an integer into the buffer as 4 bytes, big-endian.
-     */
-    private void xdr_int(int i) {
-	buffer[offset++] = (byte)((i >> 24) & 0xff);
-	buffer[offset++] = (byte)((i >> 16) & 0xff);
-	buffer[offset++] = (byte)((i >> 8) & 0xff);
-	buffer[offset++] = (byte)(i & 0xff);
-    }
+  /**
+   * Pads the buffer with zero bytes up to the nearest multiple of 4.
+   */
+  private void pad() {
+    int newOffset = ((offset + 3) / 4) * 4;
+    while (offset < newOffset) {
+      buffer[offset++] = 0;
+    }
+  }
+        
+  /**
+   * Puts an integer into the buffer as 4 bytes, big-endian.
+   */
+  private void xdr_int(int i) {
+    buffer[offset++] = (byte)((i >> 24) & 0xff);
+    buffer[offset++] = (byte)((i >> 16) & 0xff);
+    buffer[offset++] = (byte)((i >> 8) & 0xff);
+    buffer[offset++] = (byte)(i & 0xff);
+  }
     
 }