You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [19/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Mon Apr 16 14:44:35 2007
@@ -32,154 +32,154 @@
**************************************************/
class TaskTrackerStatus implements Writable {
- static { // register a ctor
- WritableFactories.setFactory
- (TaskTrackerStatus.class,
- new WritableFactory() {
- public Writable newInstance() { return new TaskTrackerStatus(); }
- });
- }
-
- String trackerName;
- String host;
- int httpPort;
- int failures;
- List<TaskStatus> taskReports;
+ static { // register a ctor
+ WritableFactories.setFactory
+ (TaskTrackerStatus.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new TaskTrackerStatus(); }
+ });
+ }
+
+ String trackerName;
+ String host;
+ int httpPort;
+ int failures;
+ List<TaskStatus> taskReports;
- volatile long lastSeen;
+ volatile long lastSeen;
- /**
- */
- public TaskTrackerStatus() {
- taskReports = new ArrayList();
- }
-
- /**
- */
- public TaskTrackerStatus(String trackerName, String host,
- int httpPort, List<TaskStatus> taskReports,
- int failures) {
- this.trackerName = trackerName;
- this.host = host;
- this.httpPort = httpPort;
-
- this.taskReports = new ArrayList(taskReports);
- this.failures = failures;
- }
-
- /**
- */
- public String getTrackerName() {
- return trackerName;
- }
- /**
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Get the port that this task tracker is serving http requests on.
- * @return the http port
- */
- public int getHttpPort() {
- return httpPort;
- }
+ /**
+ */
+ public TaskTrackerStatus() {
+ taskReports = new ArrayList();
+ }
+
+ /**
+ */
+ public TaskTrackerStatus(String trackerName, String host,
+ int httpPort, List<TaskStatus> taskReports,
+ int failures) {
+ this.trackerName = trackerName;
+ this.host = host;
+ this.httpPort = httpPort;
+
+ this.taskReports = new ArrayList(taskReports);
+ this.failures = failures;
+ }
+
+ /**
+ */
+ public String getTrackerName() {
+ return trackerName;
+ }
+ /**
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Get the port that this task tracker is serving http requests on.
+ * @return the http port
+ */
+ public int getHttpPort() {
+ return httpPort;
+ }
- /**
- * Get the number of tasks that have failed on this tracker.
- * @return The number of failed tasks
- */
- public int getFailures() {
- return failures;
- }
+ /**
+ * Get the number of tasks that have failed on this tracker.
+ * @return The number of failed tasks
+ */
+ public int getFailures() {
+ return failures;
+ }
- /**
- * Get the current tasks at the TaskTracker.
- * Tasks are tracked by a {@link TaskStatus} object.
- *
- * @return a list of {@link TaskStatus} representing
- * the current tasks at the TaskTracker.
- */
- public List<TaskStatus> getTaskReports() {
- return taskReports;
- }
+ /**
+ * Get the current tasks at the TaskTracker.
+ * Tasks are tracked by a {@link TaskStatus} object.
+ *
+ * @return a list of {@link TaskStatus} representing
+ * the current tasks at the TaskTracker.
+ */
+ public List<TaskStatus> getTaskReports() {
+ return taskReports;
+ }
- /**
- * Return the current MapTask count
- */
- public int countMapTasks() {
- int mapCount = 0;
- for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
- TaskStatus ts = (TaskStatus) it.next();
- TaskStatus.State state = ts.getRunState();
- if (ts.getIsMap() &&
- ((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
- mapCount++;
- }
+ /**
+ * Return the current MapTask count
+ */
+ public int countMapTasks() {
+ int mapCount = 0;
+ for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+ TaskStatus ts = (TaskStatus) it.next();
+ TaskStatus.State state = ts.getRunState();
+ if (ts.getIsMap() &&
+ ((state == TaskStatus.State.RUNNING) ||
+ (state == TaskStatus.State.UNASSIGNED))) {
+ mapCount++;
}
- return mapCount;
}
+ return mapCount;
+ }
- /**
- * Return the current ReduceTask count
- */
- public int countReduceTasks() {
- int reduceCount = 0;
- for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
- TaskStatus ts = (TaskStatus) it.next();
- TaskStatus.State state = ts.getRunState();
- if ((!ts.getIsMap()) &&
- ((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
- reduceCount++;
- }
+ /**
+ * Return the current ReduceTask count
+ */
+ public int countReduceTasks() {
+ int reduceCount = 0;
+ for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+ TaskStatus ts = (TaskStatus) it.next();
+ TaskStatus.State state = ts.getRunState();
+ if ((!ts.getIsMap()) &&
+ ((state == TaskStatus.State.RUNNING) ||
+ (state == TaskStatus.State.UNASSIGNED))) {
+ reduceCount++;
}
- return reduceCount;
- }
-
- /**
- */
- public long getLastSeen() {
- return lastSeen;
- }
- /**
- */
- public void setLastSeen(long lastSeen) {
- this.lastSeen = lastSeen;
- }
-
- ///////////////////////////////////////////
- // Writable
- ///////////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, trackerName);
- UTF8.writeString(out, host);
- out.writeInt(httpPort);
-
- out.writeInt(taskReports.size());
- out.writeInt(failures);
- for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
- ((TaskStatus) it.next()).write(out);
- }
}
+ return reduceCount;
+ }
- /**
- */
- public void readFields(DataInput in) throws IOException {
- this.trackerName = UTF8.readString(in);
- this.host = UTF8.readString(in);
- this.httpPort = in.readInt();
-
- taskReports.clear();
-
- int numTasks = in.readInt();
- this.failures = in.readInt();
- for (int i = 0; i < numTasks; i++) {
- TaskStatus tmp = new TaskStatus();
- tmp.readFields(in);
- taskReports.add(tmp);
- }
+ /**
+ */
+ public long getLastSeen() {
+ return lastSeen;
+ }
+ /**
+ */
+ public void setLastSeen(long lastSeen) {
+ this.lastSeen = lastSeen;
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ UTF8.writeString(out, trackerName);
+ UTF8.writeString(out, host);
+ out.writeInt(httpPort);
+
+ out.writeInt(taskReports.size());
+ out.writeInt(failures);
+ for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+ ((TaskStatus) it.next()).write(out);
+ }
+ }
+
+ /**
+ */
+ public void readFields(DataInput in) throws IOException {
+ this.trackerName = UTF8.readString(in);
+ this.host = UTF8.readString(in);
+ this.httpPort = in.readInt();
+
+ taskReports.clear();
+
+ int numTasks = in.readInt();
+ this.failures = in.readInt();
+ for (int i = 0; i < numTasks; i++) {
+ TaskStatus tmp = new TaskStatus();
+ tmp.readFields(in);
+ taskReports.add(tmp);
}
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java Mon Apr 16 14:44:35 2007
@@ -42,293 +42,293 @@
public class Job {
- // A job will be in one of the following states
- final public static int SUCCESS = 0;
- final public static int WAITING = 1;
- final public static int RUNNING = 2;
- final public static int READY = 3;
- final public static int FAILED = 4;
- final public static int DEPENDENT_FAILED = 5;
-
-
- private JobConf theJobConf;
- private int state;
- private String jobID; // assigned and used by JobControl class
- private String mapredJobID; // the job ID assigned by map/reduce
- private String jobName; // external name, assigned/used by client app
- private String message; // some info for human consumption,
- // e.g. the reason why the job failed
- private ArrayList dependingJobs; // the jobs the current job depends on
-
- private JobClient jc = null; // the map reduce job client
-
- /**
- * Construct a job.
- * @param jobConf a mapred job configuration representing a job to be executed.
- * @param dependingJobs an array of jobs the current job depends on
- */
- public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException {
- this.theJobConf = jobConf;
- this.dependingJobs = dependingJobs;
- this.state = Job.WAITING;
- this.jobID = "unassigned";
- this.mapredJobID = "unassigned";
- this.jobName = "unassigned";
- this.message = "just initialized";
- this.jc = new JobClient(jobConf);
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("job name:\t").append(this.jobName).append("\n");
- sb.append("job id:\t").append(this.jobID).append("\n");
- sb.append("job state:\t").append(this.state).append("\n");
- sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
- sb.append("job message:\t").append(this.message).append("\n");
+ // A job will be in one of the following states
+ final public static int SUCCESS = 0;
+ final public static int WAITING = 1;
+ final public static int RUNNING = 2;
+ final public static int READY = 3;
+ final public static int FAILED = 4;
+ final public static int DEPENDENT_FAILED = 5;
+
+
+ private JobConf theJobConf;
+ private int state;
+ private String jobID; // assigned and used by JobControl class
+ private String mapredJobID; // the job ID assigned by map/reduce
+ private String jobName; // external name, assigned/used by client app
+ private String message; // some info for human consumption,
+ // e.g. the reason why the job failed
+ private ArrayList dependingJobs; // the jobs the current job depends on
+
+ private JobClient jc = null; // the map reduce job client
+
+ /**
+ * Construct a job.
+ * @param jobConf a mapred job configuration representing a job to be executed.
+ * @param dependingJobs an array of jobs the current job depends on
+ */
+ public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException {
+ this.theJobConf = jobConf;
+ this.dependingJobs = dependingJobs;
+ this.state = Job.WAITING;
+ this.jobID = "unassigned";
+ this.mapredJobID = "unassigned";
+ this.jobName = "unassigned";
+ this.message = "just initialized";
+ this.jc = new JobClient(jobConf);
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("job name:\t").append(this.jobName).append("\n");
+ sb.append("job id:\t").append(this.jobID).append("\n");
+ sb.append("job state:\t").append(this.state).append("\n");
+ sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
+ sb.append("job message:\t").append(this.message).append("\n");
- if (this.dependingJobs == null) {
- sb.append("job has no depending job:\t").append("\n");
- } else {
- sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
- for (int i = 0; i < this.dependingJobs.size(); i++) {
- sb.append("\t depending job ").append(i).append(":\t");
- sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n");
- }
- }
- return sb.toString();
- }
-
- /**
- * @return the job name of this job
- */
- public String getJobName() {
- return this.jobName;
- }
-
- /**
- * Set the job name for this job.
- * @param jobName the job name
- */
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- /**
- * @return the job ID of this job
- */
- public String getJobID() {
- return this.jobID;
- }
-
- /**
- * Set the job ID for this job.
- * @param id the job ID
- */
- public void setJobID(String id) {
- this.jobID = id;
- }
-
- /**
- * @return the mapred ID of this job
- */
- public String getMapredJobID() {
- return this.mapredJobID;
- }
-
- /**
- * Set the mapred ID for this job.
- * @param mapredJobID the mapred job ID for this job.
- */
- public void setMapredJobID(String mapredJobID) {
- this.jobID = mapredJobID;
- }
-
- /**
- * @return the mapred job conf of this job
- */
- public JobConf getJobConf() {
- return this.theJobConf;
- }
+ if (this.dependingJobs == null) {
+ sb.append("job has no depending job:\t").append("\n");
+ } else {
+ sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
+ for (int i = 0; i < this.dependingJobs.size(); i++) {
+ sb.append("\t depending job ").append(i).append(":\t");
+ sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @return the job name of this job
+ */
+ public String getJobName() {
+ return this.jobName;
+ }
+
+ /**
+ * Set the job name for this job.
+ * @param jobName the job name
+ */
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ /**
+ * @return the job ID of this job
+ */
+ public String getJobID() {
+ return this.jobID;
+ }
+
+ /**
+ * Set the job ID for this job.
+ * @param id the job ID
+ */
+ public void setJobID(String id) {
+ this.jobID = id;
+ }
+
+ /**
+ * @return the mapred ID of this job
+ */
+ public String getMapredJobID() {
+ return this.mapredJobID;
+ }
+
+ /**
+ * Set the mapred ID for this job.
+ * @param mapredJobID the mapred job ID for this job.
+ */
+ public void setMapredJobID(String mapredJobID) {
+ this.jobID = mapredJobID;
+ }
+
+ /**
+ * @return the mapred job conf of this job
+ */
+ public JobConf getJobConf() {
+ return this.theJobConf;
+ }
- /**
- * Set the mapred job conf for this job.
- * @param jobConf the mapred job conf for this job.
- */
- public void setJobConf(JobConf jobConf) {
- this.theJobConf = jobConf;
- }
-
- /**
- * @return the state of this job
- */
- public int getState() {
- return this.state;
- }
-
- /**
- * Set the state for this job.
- * @param state the new state for this job.
- */
- public void setState(int state) {
- this.state = state;
- }
-
- /**
- * @return the message of this job
- */
- public String getMessage() {
- return this.message;
- }
-
- /**
- * Set the message for this job.
- * @param message the message for this job.
- */
- public void setMessage(String message) {
- this.message = message;
- }
-
- /**
- * @return the depending jobs of this job
- */
- public ArrayList getDependingJobs() {
- return this.dependingJobs;
- }
-
- /**
- * @return true if this job is in a complete state
- */
- public boolean isCompleted() {
- return this.state == Job.FAILED ||
- this.state == Job.DEPENDENT_FAILED ||
- this.state == Job.SUCCESS;
- }
-
- /**
- * @return true if this job is in READY state
- */
- public boolean isReady() {
- return this.state == Job.READY;
- }
-
- /**
- * Check the state of this running job. The state may
- * remain the same, become SUCCESS or FAILED.
- */
- private void checkRunningState() {
- RunningJob running = null;
- try {
- running = jc.getJob(this.mapredJobID);
- if (running.isComplete()) {
- if (running.isSuccessful()) {
- this.state = Job.SUCCESS;
- } else {
- this.state = Job.FAILED;
- this.message = "Job failed!";
- try {
- running.killJob();
- } catch (IOException e1) {
+ /**
+ * Set the mapred job conf for this job.
+ * @param jobConf the mapred job conf for this job.
+ */
+ public void setJobConf(JobConf jobConf) {
+ this.theJobConf = jobConf;
+ }
+
+ /**
+ * @return the state of this job
+ */
+ public int getState() {
+ return this.state;
+ }
+
+ /**
+ * Set the state for this job.
+ * @param state the new state for this job.
+ */
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ /**
+ * @return the message of this job
+ */
+ public String getMessage() {
+ return this.message;
+ }
+
+ /**
+ * Set the message for this job.
+ * @param message the message for this job.
+ */
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ /**
+ * @return the depending jobs of this job
+ */
+ public ArrayList getDependingJobs() {
+ return this.dependingJobs;
+ }
+
+ /**
+ * @return true if this job is in a complete state
+ */
+ public boolean isCompleted() {
+ return this.state == Job.FAILED ||
+ this.state == Job.DEPENDENT_FAILED ||
+ this.state == Job.SUCCESS;
+ }
+
+ /**
+ * @return true if this job is in READY state
+ */
+ public boolean isReady() {
+ return this.state == Job.READY;
+ }
+
+ /**
+ * Check the state of this running job. The state may
+ * remain the same, become SUCCESS or FAILED.
+ */
+ private void checkRunningState() {
+ RunningJob running = null;
+ try {
+ running = jc.getJob(this.mapredJobID);
+ if (running.isComplete()) {
+ if (running.isSuccessful()) {
+ this.state = Job.SUCCESS;
+ } else {
+ this.state = Job.FAILED;
+ this.message = "Job failed!";
+ try {
+ running.killJob();
+ } catch (IOException e1) {
- }
- try {
- this.jc.close();
- } catch (IOException e2) {
+ }
+ try {
+ this.jc.close();
+ } catch (IOException e2) {
- }
- }
- }
+ }
+ }
+ }
- } catch (IOException ioe) {
- this.state = Job.FAILED;
- this.message = StringUtils.stringifyException(ioe);
- try {
- if(running != null)
- running.killJob();
- } catch (IOException e1) {
+ } catch (IOException ioe) {
+ this.state = Job.FAILED;
+ this.message = StringUtils.stringifyException(ioe);
+ try {
+ if(running != null)
+ running.killJob();
+ } catch (IOException e1) {
- }
- try {
- this.jc.close();
- } catch (IOException e1) {
+ }
+ try {
+ this.jc.close();
+ } catch (IOException e1) {
- }
- }
- }
-
- /**
- * Check and update the state of this job. The state changes
- * depending on its current state and the states of the depending jobs.
- */
- public int checkState() {
- if (this.state == Job.RUNNING) {
- checkRunningState();
- }
- if (this.state != Job.WAITING) {
- return this.state;
- }
- if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
- this.state = Job.READY;
- return this.state;
- }
- Job pred = null;
- int n = this.dependingJobs.size();
- for (int i = 0; i < n; i++) {
- pred = (Job) this.dependingJobs.get(i);
- int s = pred.checkState();
- if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
- break; // a pred is still not completed, continue in WAITING
- // state
- }
- if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
- this.state = Job.DEPENDENT_FAILED;
- this.message = "depending job " + i + " with jobID "
- + pred.getJobID() + " failed. " + pred.getMessage();
- break;
- }
- // pred must be in success state
- if (i == n - 1) {
- this.state = Job.READY;
- }
- }
+ }
+ }
+ }
+
+ /**
+ * Check and update the state of this job. The state changes
+ * depending on its current state and the states of the depending jobs.
+ */
+ public int checkState() {
+ if (this.state == Job.RUNNING) {
+ checkRunningState();
+ }
+ if (this.state != Job.WAITING) {
+ return this.state;
+ }
+ if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+ this.state = Job.READY;
+ return this.state;
+ }
+ Job pred = null;
+ int n = this.dependingJobs.size();
+ for (int i = 0; i < n; i++) {
+ pred = (Job) this.dependingJobs.get(i);
+ int s = pred.checkState();
+ if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
+ break; // a pred is still not completed, continue in WAITING
+ // state
+ }
+ if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
+ this.state = Job.DEPENDENT_FAILED;
+ this.message = "depending job " + i + " with jobID "
+ + pred.getJobID() + " failed. " + pred.getMessage();
+ break;
+ }
+ // pred must be in success state
+ if (i == n - 1) {
+ this.state = Job.READY;
+ }
+ }
- return this.state;
- }
+ return this.state;
+ }
- /**
- * Submit this job to mapred. The state becomes RUNNING if submission
- * is successful, FAILED otherwise.
- */
- public void submit() {
- try {
- if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
- FileSystem fs = FileSystem.get(theJobConf);
- Path inputPaths[] = theJobConf.getInputPaths();
- for (int i = 0; i < inputPaths.length; i++) {
- if (!fs.exists(inputPaths[i])) {
- try {
- fs.mkdirs(inputPaths[i]);
- } catch (IOException e) {
+ /**
+ * Submit this job to mapred. The state becomes RUNNING if submission
+ * is successful, FAILED otherwise.
+ */
+ public void submit() {
+ try {
+ if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
+ FileSystem fs = FileSystem.get(theJobConf);
+ Path inputPaths[] = theJobConf.getInputPaths();
+ for (int i = 0; i < inputPaths.length; i++) {
+ if (!fs.exists(inputPaths[i])) {
+ try {
+ fs.mkdirs(inputPaths[i]);
+ } catch (IOException e) {
- }
- }
- }
}
- RunningJob running = jc.submitJob(theJobConf);
- this.mapredJobID = running.getJobID();
- this.state = Job.RUNNING;
- } catch (IOException ioe) {
- this.state = Job.FAILED;
- this.message = StringUtils.stringifyException(ioe);
+ }
}
+ }
+ RunningJob running = jc.submitJob(theJobConf);
+ this.mapredJobID = running.getJobID();
+ this.state = Job.RUNNING;
+ } catch (IOException ioe) {
+ this.state = Job.FAILED;
+ this.message = StringUtils.stringifyException(ioe);
}
+ }
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
- }
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Mon Apr 16 14:44:35 2007
@@ -39,270 +39,270 @@
*/
public class JobControl implements Runnable{
- // The thread can be in one of the following state
- private static final int RUNNING = 0;
- private static final int SUSPENDED = 1;
- private static final int STOPPED = 2;
- private static final int STOPPING = 3;
- private static final int READY = 4;
-
- private int runnerState; // the thread state
-
- private Hashtable waitingJobs;
- private Hashtable readyJobs;
- private Hashtable runningJobs;
- private Hashtable successfulJobs;
- private Hashtable failedJobs;
-
- private long nextJobID;
- private String groupName;
-
- /**
- * Construct a job control for a group of jobs.
- * @param groupName a name identifying this group
- */
- public JobControl(String groupName) {
- this.waitingJobs = new Hashtable();
- this.readyJobs = new Hashtable();
- this.runningJobs = new Hashtable();
- this.successfulJobs = new Hashtable();
- this.failedJobs = new Hashtable();
- this.nextJobID = -1;
- this.groupName = groupName;
- this.runnerState = JobControl.READY;
+ // The thread can be in one of the following state
+ private static final int RUNNING = 0;
+ private static final int SUSPENDED = 1;
+ private static final int STOPPED = 2;
+ private static final int STOPPING = 3;
+ private static final int READY = 4;
+
+ private int runnerState; // the thread state
+
+ private Hashtable waitingJobs;
+ private Hashtable readyJobs;
+ private Hashtable runningJobs;
+ private Hashtable successfulJobs;
+ private Hashtable failedJobs;
+
+ private long nextJobID;
+ private String groupName;
+
+ /**
+ * Construct a job control for a group of jobs.
+ * @param groupName a name identifying this group
+ */
+ public JobControl(String groupName) {
+ this.waitingJobs = new Hashtable();
+ this.readyJobs = new Hashtable();
+ this.runningJobs = new Hashtable();
+ this.successfulJobs = new Hashtable();
+ this.failedJobs = new Hashtable();
+ this.nextJobID = -1;
+ this.groupName = groupName;
+ this.runnerState = JobControl.READY;
- }
+ }
- private static ArrayList toArrayList(Hashtable jobs) {
- ArrayList retv = new ArrayList();
- synchronized (jobs) {
- Iterator iter = jobs.values().iterator();
- while (iter.hasNext()) {
- retv.add(iter.next());
- }
- }
+ private static ArrayList toArrayList(Hashtable jobs) {
+ ArrayList retv = new ArrayList();
+ synchronized (jobs) {
+ Iterator iter = jobs.values().iterator();
+ while (iter.hasNext()) {
+ retv.add(iter.next());
+ }
+ }
- return retv;
- }
+ return retv;
+ }
- /**
- * @return the jobs in the waiting state
- */
- public ArrayList getWaitingJobs() {
- return JobControl.toArrayList(this.waitingJobs);
- }
-
- /**
- * @return the jobs in the running state
- */
- public ArrayList getRunningJobs() {
- return JobControl.toArrayList(this.runningJobs);
- }
-
- /**
- * @return the jobs in the ready state
- */
- public ArrayList getReadyJobs() {
- return JobControl.toArrayList(this.readyJobs);
- }
-
- /**
- * @return the jobs in the success state
- */
- public ArrayList getSuccessfulJobs() {
- return JobControl.toArrayList(this.successfulJobs);
- }
-
- public ArrayList getFailedJobs() {
- return JobControl.toArrayList(this.failedJobs);
- }
-
- private String getNextJobID() {
- nextJobID += 1;
- return this.groupName + this.nextJobID;
- }
-
- private static void addToQueue(Job aJob, Hashtable queue) {
- synchronized(queue) {
- queue.put(aJob.getJobID(), aJob);
- }
- }
-
- private void addToQueue(Job aJob) {
- Hashtable queue = getQueue(aJob.getState());
- addToQueue(aJob, queue);
- }
-
- private Hashtable getQueue(int state) {
- Hashtable retv = null;
- if (state == Job.WAITING) {
- retv = this.waitingJobs;
- } else if (state == Job.READY) {
- retv = this.readyJobs;
- } else if (state == Job.RUNNING) {
- retv = this.runningJobs;
- } else if (state == Job.SUCCESS) {
- retv = this.successfulJobs;
- } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
- retv = this.failedJobs;
- }
- return retv;
+ /**
+ * @return the jobs in the waiting state
+ */
+ public ArrayList getWaitingJobs() {
+ return JobControl.toArrayList(this.waitingJobs);
+ }
+
+ /**
+ * @return the jobs in the running state
+ */
+ public ArrayList getRunningJobs() {
+ return JobControl.toArrayList(this.runningJobs);
+ }
+
+ /**
+ * @return the jobs in the ready state
+ */
+ public ArrayList getReadyJobs() {
+ return JobControl.toArrayList(this.readyJobs);
+ }
+
+ /**
+ * @return the jobs in the success state
+ */
+ public ArrayList getSuccessfulJobs() {
+ return JobControl.toArrayList(this.successfulJobs);
+ }
+
+ public ArrayList getFailedJobs() {
+ return JobControl.toArrayList(this.failedJobs);
+ }
+
+ private String getNextJobID() {
+ nextJobID += 1;
+ return this.groupName + this.nextJobID;
+ }
+
+ private static void addToQueue(Job aJob, Hashtable queue) {
+ synchronized(queue) {
+ queue.put(aJob.getJobID(), aJob);
+ }
+ }
+
+ private void addToQueue(Job aJob) {
+ Hashtable queue = getQueue(aJob.getState());
+ addToQueue(aJob, queue);
+ }
+
+ private Hashtable getQueue(int state) {
+ Hashtable retv = null;
+ if (state == Job.WAITING) {
+ retv = this.waitingJobs;
+ } else if (state == Job.READY) {
+ retv = this.readyJobs;
+ } else if (state == Job.RUNNING) {
+ retv = this.runningJobs;
+ } else if (state == Job.SUCCESS) {
+ retv = this.successfulJobs;
+ } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
+ retv = this.failedJobs;
+ }
+ return retv;
- }
+ }
- /**
- * Add a new job.
- * @param aJob the the new job
- */
- synchronized public String addJob(Job aJob) {
- String id = this.getNextJobID();
- aJob.setJobID(id);
- aJob.setState(Job.WAITING);
- this.addToQueue(aJob);
- return id;
- }
-
- /**
- * Add a collection of jobs
- *
- * @param jobs
- */
- public void addJobs(Collection<Job> jobs) {
- for (Job job : jobs) {
- addJob(job);
- }
- }
-
- /**
- * @return the thread state
- */
- public int getState() {
- return this.runnerState;
- }
-
- /**
- * set the thread state to STOPPING so that the
- * thread will stop when it wakes up.
- */
- public void stop() {
- this.runnerState = JobControl.STOPPING;
- }
-
- /**
- * suspend the running thread
- */
- public void suspend () {
- if (this.runnerState == JobControl.RUNNING) {
- this.runnerState = JobControl.SUSPENDED;
- }
- }
-
- /**
- * resume the suspended thread
- */
- public void resume () {
- if (this.runnerState == JobControl.SUSPENDED) {
- this.runnerState = JobControl.RUNNING;
- }
- }
+ /**
+ * Add a new job.
+ * @param aJob the the new job
+ */
+ synchronized public String addJob(Job aJob) {
+ String id = this.getNextJobID();
+ aJob.setJobID(id);
+ aJob.setState(Job.WAITING);
+ this.addToQueue(aJob);
+ return id;
+ }
+
+ /**
+ * Add a collection of jobs
+ *
+ * @param jobs
+ */
+ public void addJobs(Collection<Job> jobs) {
+ for (Job job : jobs) {
+ addJob(job);
+ }
+ }
+
+ /**
+ * @return the thread state
+ */
+ public int getState() {
+ return this.runnerState;
+ }
+
+ /**
+ * set the thread state to STOPPING so that the
+ * thread will stop when it wakes up.
+ */
+ public void stop() {
+ this.runnerState = JobControl.STOPPING;
+ }
+
+ /**
+ * suspend the running thread
+ */
+ public void suspend () {
+ if (this.runnerState == JobControl.RUNNING) {
+ this.runnerState = JobControl.SUSPENDED;
+ }
+ }
+
+ /**
+ * resume the suspended thread
+ */
+ public void resume () {
+ if (this.runnerState == JobControl.SUSPENDED) {
+ this.runnerState = JobControl.RUNNING;
+ }
+ }
- synchronized private void checkRunningJobs() {
+ synchronized private void checkRunningJobs() {
- Hashtable oldJobs = null;
- oldJobs = this.runningJobs;
- this.runningJobs = new Hashtable();
+ Hashtable oldJobs = null;
+ oldJobs = this.runningJobs;
+ this.runningJobs = new Hashtable();
- Iterator jobs = oldJobs.values().iterator();
- while (jobs.hasNext()) {
- Job nextJob = (Job)jobs.next();
- int state = nextJob.checkState();
- /*
- if (state != Job.RUNNING) {
- System.out.println("The state of the running job " +
- nextJob.getJobName() + " has changed to: " + nextJob.getState());
- }
- */
- this.addToQueue(nextJob);
- }
- }
-
- synchronized private void checkWaitingJobs() {
- Hashtable oldJobs = null;
- oldJobs = this.waitingJobs;
- this.waitingJobs = new Hashtable();
+ Iterator jobs = oldJobs.values().iterator();
+ while (jobs.hasNext()) {
+ Job nextJob = (Job)jobs.next();
+ int state = nextJob.checkState();
+ /*
+ if (state != Job.RUNNING) {
+ System.out.println("The state of the running job " +
+ nextJob.getJobName() + " has changed to: " + nextJob.getState());
+ }
+ */
+ this.addToQueue(nextJob);
+ }
+ }
+
+ synchronized private void checkWaitingJobs() {
+ Hashtable oldJobs = null;
+ oldJobs = this.waitingJobs;
+ this.waitingJobs = new Hashtable();
- Iterator jobs = oldJobs.values().iterator();
- while (jobs.hasNext()) {
- Job nextJob = (Job)jobs.next();
- int state = nextJob.checkState();
- /*
- if (state != Job.WAITING) {
- System.out.println("The state of the waiting job " +
- nextJob.getJobName() + " has changed to: " + nextJob.getState());
- }
- */
- this.addToQueue(nextJob);
- }
- }
-
- synchronized private void startReadyJobs() {
- Hashtable oldJobs = null;
- oldJobs = this.readyJobs;
- this.readyJobs = new Hashtable();
+ Iterator jobs = oldJobs.values().iterator();
+ while (jobs.hasNext()) {
+ Job nextJob = (Job)jobs.next();
+ int state = nextJob.checkState();
+ /*
+ if (state != Job.WAITING) {
+ System.out.println("The state of the waiting job " +
+ nextJob.getJobName() + " has changed to: " + nextJob.getState());
+ }
+ */
+ this.addToQueue(nextJob);
+ }
+ }
+
+ synchronized private void startReadyJobs() {
+ Hashtable oldJobs = null;
+ oldJobs = this.readyJobs;
+ this.readyJobs = new Hashtable();
- Iterator jobs = oldJobs.values().iterator();
- while (jobs.hasNext()) {
- Job nextJob = (Job)jobs.next();
- //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
- nextJob.submit();
- //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
- this.addToQueue(nextJob);
- }
- }
-
- synchronized public boolean allFinished() {
- return this.waitingJobs.size() == 0 &&
- this.readyJobs.size() == 0 &&
- this.runningJobs.size() == 0;
- }
-
- /**
- * The main loop for the thread.
- * The loop does the following:
- * Check the states of the running jobs
- * Update the states of waiting jobs
- * Submit the jobs in ready state
- */
- public void run() {
- this.runnerState = JobControl.RUNNING;
- while (true) {
- while (this.runnerState == JobControl.SUSPENDED) {
- try {
- Thread.sleep(5000);
- }
- catch (Exception e) {
+ Iterator jobs = oldJobs.values().iterator();
+ while (jobs.hasNext()) {
+ Job nextJob = (Job)jobs.next();
+ //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
+ nextJob.submit();
+ //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
+ this.addToQueue(nextJob);
+ }
+ }
+
+ synchronized public boolean allFinished() {
+ return this.waitingJobs.size() == 0 &&
+ this.readyJobs.size() == 0 &&
+ this.runningJobs.size() == 0;
+ }
+
+ /**
+ * The main loop for the thread.
+ * The loop does the following:
+ * Check the states of the running jobs
+ * Update the states of waiting jobs
+ * Submit the jobs in ready state
+ */
+ public void run() {
+ this.runnerState = JobControl.RUNNING;
+ while (true) {
+ while (this.runnerState == JobControl.SUSPENDED) {
+ try {
+ Thread.sleep(5000);
+ }
+ catch (Exception e) {
- }
- }
- checkRunningJobs();
- checkWaitingJobs();
- startReadyJobs();
- if (this.runnerState != JobControl.RUNNING &&
- this.runnerState != JobControl.SUSPENDED) {
- break;
- }
- try {
- Thread.sleep(5000);
- }
- catch (Exception e) {
+ }
+ }
+ checkRunningJobs();
+ checkWaitingJobs();
+ startReadyJobs();
+ if (this.runnerState != JobControl.RUNNING &&
+ this.runnerState != JobControl.SUSPENDED) {
+ break;
+ }
+ try {
+ Thread.sleep(5000);
+ }
+ catch (Exception e) {
- }
- if (this.runnerState != JobControl.RUNNING &&
- this.runnerState != JobControl.SUSPENDED) {
- break;
- }
- }
- this.runnerState = JobControl.STOPPED;
- }
+ }
+ if (this.runnerState != JobControl.RUNNING &&
+ this.runnerState != JobControl.SUSPENDED) {
+ break;
+ }
+ }
+ this.runnerState = JobControl.STOPPED;
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java Mon Apr 16 14:44:35 2007
@@ -34,9 +34,9 @@
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) {
return new RecordWriter(){
- public void write(WritableComparable key, Writable value) { }
- public void close(Reporter reporter) { }
- };
+ public void write(WritableComparable key, Writable value) { }
+ public void close(Reporter reporter) { }
+ };
}
public void checkOutputSpecs(FileSystem ignored, JobConf job) { }
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java Mon Apr 16 14:44:35 2007
@@ -35,153 +35,153 @@
*/
public class ContextFactory {
- private static final String PROPERTIES_FILE =
- "/hadoop-metrics.properties";
- private static final String CONTEXT_CLASS_SUFFIX =
- ".class";
- private static final String DEFAULT_CONTEXT_CLASSNAME =
- "org.apache.hadoop.metrics.spi.NullContext";
-
- private static ContextFactory theFactory = null;
-
- private Map<String,Object> attributeMap = new HashMap<String,Object>();
- private Map<String,AbstractMetricsContext> contextMap =
- new HashMap<String,AbstractMetricsContext>();
-
- // Used only when contexts, or the ContextFactory itself, cannot be
- // created.
- private static Map<String,MetricsContext> nullContextMap =
- new HashMap<String,MetricsContext>();
-
- /** Creates a new instance of ContextFactory */
- protected ContextFactory() {
- }
-
- /**
- * Returns the value of the named attribute, or null if there is no
- * attribute of that name.
- *
- * @param attributeName the attribute name
- * @return the attribute value
- */
- public Object getAttribute(String attributeName) {
- return attributeMap.get(attributeName);
- }
-
- /**
- * Returns the names of all the factory's attributes.
- *
- * @return the attribute names
- */
- public String[] getAttributeNames() {
- String[] result = new String[attributeMap.size()];
- int i = 0;
- // for (String attributeName : attributeMap.keySet()) {
- Iterator it = attributeMap.keySet().iterator();
- while (it.hasNext()) {
- result[i++] = (String) it.next();
- }
- return result;
- }
-
- /**
- * Sets the named factory attribute to the specified value, creating it
- * if it did not already exist. If the value is null, this is the same as
- * calling removeAttribute.
- *
- * @param attributeName the attribute name
- * @param value the new attribute value
- */
- public void setAttribute(String attributeName, Object value) {
- attributeMap.put(attributeName, value);
- }
+ private static final String PROPERTIES_FILE =
+ "/hadoop-metrics.properties";
+ private static final String CONTEXT_CLASS_SUFFIX =
+ ".class";
+ private static final String DEFAULT_CONTEXT_CLASSNAME =
+ "org.apache.hadoop.metrics.spi.NullContext";
+
+ private static ContextFactory theFactory = null;
+
+ private Map<String,Object> attributeMap = new HashMap<String,Object>();
+ private Map<String,AbstractMetricsContext> contextMap =
+ new HashMap<String,AbstractMetricsContext>();
+
+ // Used only when contexts, or the ContextFactory itself, cannot be
+ // created.
+ private static Map<String,MetricsContext> nullContextMap =
+ new HashMap<String,MetricsContext>();
+
+ /** Creates a new instance of ContextFactory */
+ protected ContextFactory() {
+ }
+
+ /**
+ * Returns the value of the named attribute, or null if there is no
+ * attribute of that name.
+ *
+ * @param attributeName the attribute name
+ * @return the attribute value
+ */
+ public Object getAttribute(String attributeName) {
+ return attributeMap.get(attributeName);
+ }
+
+ /**
+ * Returns the names of all the factory's attributes.
+ *
+ * @return the attribute names
+ */
+ public String[] getAttributeNames() {
+ String[] result = new String[attributeMap.size()];
+ int i = 0;
+ // for (String attributeName : attributeMap.keySet()) {
+ Iterator it = attributeMap.keySet().iterator();
+ while (it.hasNext()) {
+ result[i++] = (String) it.next();
+ }
+ return result;
+ }
+
+ /**
+ * Sets the named factory attribute to the specified value, creating it
+ * if it did not already exist. If the value is null, this is the same as
+ * calling removeAttribute.
+ *
+ * @param attributeName the attribute name
+ * @param value the new attribute value
+ */
+ public void setAttribute(String attributeName, Object value) {
+ attributeMap.put(attributeName, value);
+ }
- /**
- * Removes the named attribute if it exists.
- *
- * @param attributeName the attribute name
- */
- public void removeAttribute(String attributeName) {
- attributeMap.remove(attributeName);
- }
-
- /**
- * Returns the named MetricsContext instance, constructing it if necessary
- * using the factory's current configuration attributes. <p/>
- *
- * When constructing the instance, if the factory property
- * <i>contextName</i>.class</code> exists,
- * its value is taken to be the name of the class to instantiate. Otherwise,
- * the default is to create an instance of
- * <code>org.apache.hadoop.metrics.spi.NullContext</code>, which is a
- * dummy "no-op" context which will cause all metric data to be discarded.
- *
- * @param contextName the name of the context
- * @return the named MetricsContext
- */
- public synchronized MetricsContext getContext(String contextName)
- throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
- {
- AbstractMetricsContext metricsContext = contextMap.get(contextName);
- if (metricsContext == null) {
- String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
- String className = (String) getAttribute(classNameAttribute);
- if (className == null) {
- className = DEFAULT_CONTEXT_CLASSNAME;
- }
- Class contextClass = Class.forName(className);
- metricsContext = (AbstractMetricsContext) contextClass.newInstance();
- metricsContext.init(contextName, this);
- contextMap.put(contextName, metricsContext);
- }
- return metricsContext;
- }
-
- /**
- * Returns a "null" context - one which does nothing.
- */
- public static synchronized MetricsContext getNullContext(String contextName) {
- MetricsContext nullContext = nullContextMap.get(contextName);
- if (nullContext == null) {
- nullContext = new NullContext();
- nullContextMap.put(contextName, nullContext);
- }
- return nullContext;
- }
-
- /**
- * Returns the singleton ContextFactory instance, constructing it if
- * necessary. <p/>
- *
- * When the instance is constructed, this method checks if the file
- * <code>hadoop-metrics.properties</code> exists on the class path. If it
- * exists, it must be in the format defined by java.util.Properties, and all
- * the properties in the file are set as attributes on the newly created
- * ContextFactory instance.
- *
- * @return the singleton ContextFactory instance
- */
- public static synchronized ContextFactory getFactory() throws IOException {
- if (theFactory == null) {
- theFactory = new ContextFactory();
- theFactory.setAttributes();
- }
- return theFactory;
- }
-
- private void setAttributes() throws IOException {
- InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
- if (is != null) {
- Properties properties = new Properties();
- properties.load(is);
- //for (Object propertyNameObj : properties.keySet()) {
- Iterator it = properties.keySet().iterator();
- while (it.hasNext()) {
- String propertyName = (String) it.next();
- String propertyValue = properties.getProperty(propertyName);
- setAttribute(propertyName, propertyValue);
- }
- }
+ /**
+ * Removes the named attribute if it exists.
+ *
+ * @param attributeName the attribute name
+ */
+ public void removeAttribute(String attributeName) {
+ attributeMap.remove(attributeName);
+ }
+
+ /**
+ * Returns the named MetricsContext instance, constructing it if necessary
+ * using the factory's current configuration attributes. <p/>
+ *
+ * When constructing the instance, if the factory property
+ * <i>contextName</i>.class</code> exists,
+ * its value is taken to be the name of the class to instantiate. Otherwise,
+ * the default is to create an instance of
+ * <code>org.apache.hadoop.metrics.spi.NullContext</code>, which is a
+ * dummy "no-op" context which will cause all metric data to be discarded.
+ *
+ * @param contextName the name of the context
+ * @return the named MetricsContext
+ */
+ public synchronized MetricsContext getContext(String contextName)
+ throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
+ {
+ AbstractMetricsContext metricsContext = contextMap.get(contextName);
+ if (metricsContext == null) {
+ String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
+ String className = (String) getAttribute(classNameAttribute);
+ if (className == null) {
+ className = DEFAULT_CONTEXT_CLASSNAME;
+ }
+ Class contextClass = Class.forName(className);
+ metricsContext = (AbstractMetricsContext) contextClass.newInstance();
+ metricsContext.init(contextName, this);
+ contextMap.put(contextName, metricsContext);
+ }
+ return metricsContext;
+ }
+
+ /**
+ * Returns a "null" context - one which does nothing.
+ */
+ public static synchronized MetricsContext getNullContext(String contextName) {
+ MetricsContext nullContext = nullContextMap.get(contextName);
+ if (nullContext == null) {
+ nullContext = new NullContext();
+ nullContextMap.put(contextName, nullContext);
+ }
+ return nullContext;
+ }
+
+ /**
+ * Returns the singleton ContextFactory instance, constructing it if
+ * necessary. <p/>
+ *
+ * When the instance is constructed, this method checks if the file
+ * <code>hadoop-metrics.properties</code> exists on the class path. If it
+ * exists, it must be in the format defined by java.util.Properties, and all
+ * the properties in the file are set as attributes on the newly created
+ * ContextFactory instance.
+ *
+ * @return the singleton ContextFactory instance
+ */
+ public static synchronized ContextFactory getFactory() throws IOException {
+ if (theFactory == null) {
+ theFactory = new ContextFactory();
+ theFactory.setAttributes();
+ }
+ return theFactory;
+ }
+
+ private void setAttributes() throws IOException {
+ InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
+ if (is != null) {
+ Properties properties = new Properties();
+ properties.load(is);
+ //for (Object propertyNameObj : properties.keySet()) {
+ Iterator it = properties.keySet().iterator();
+ while (it.hasNext()) {
+ String propertyName = (String) it.next();
+ String propertyValue = properties.getProperty(propertyName);
+ setAttribute(propertyName, propertyValue);
+ }
}
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsContext.java Mon Apr 16 14:44:35 2007
@@ -27,69 +27,69 @@
*/
public interface MetricsContext {
- /**
- * Default period in seconds at which data is sent to the metrics system.
- */
- public static final int DEFAULT_PERIOD = 5;
-
- /**
- * Returns the context name.
- *
- * @return the context name
- */
- public abstract String getContextName();
-
- /**
- * Starts or restarts monitoring, the emitting of metrics records as they are
- * updated.
- */
- public abstract void startMonitoring()
- throws IOException;
+ /**
+ * Default period in seconds at which data is sent to the metrics system.
+ */
+ public static final int DEFAULT_PERIOD = 5;
+
+ /**
+ * Returns the context name.
+ *
+ * @return the context name
+ */
+ public abstract String getContextName();
+
+ /**
+ * Starts or restarts monitoring, the emitting of metrics records as they are
+ * updated.
+ */
+ public abstract void startMonitoring()
+ throws IOException;
- /**
- * Stops monitoring. This does not free any data that the implementation
- * may have buffered for sending at the next timer event. It
- * is OK to call <code>startMonitoring()</code> again after calling
- * this.
- * @see #close()
- */
- public abstract void stopMonitoring();
-
- /**
- * Returns true if monitoring is currently in progress.
- */
- public abstract boolean isMonitoring();
-
- /**
- * Stops monitoring and also frees any buffered data, returning this
- * object to its initial state.
- */
- public abstract void close();
-
- /**
- * Creates a new MetricsRecord instance with the given <code>recordName</code>.
- * Throws an exception if the metrics implementation is configured with a fixed
- * set of record names and <code>recordName</code> is not in that set.
- *
- * @param recordName the name of the record
- * @throws MetricsException if recordName conflicts with configuration data
- */
- public abstract MetricsRecord createRecord(String recordName);
-
- /**
- * Registers a callback to be called at regular time intervals, as
- * determined by the implementation-class specific configuration.
- *
- * @param updater object to be run periodically; it should updated
- * some metrics records and then return
- */
- public abstract void registerUpdater(Updater updater);
+ /**
+ * Stops monitoring. This does not free any data that the implementation
+ * may have buffered for sending at the next timer event. It
+ * is OK to call <code>startMonitoring()</code> again after calling
+ * this.
+ * @see #close()
+ */
+ public abstract void stopMonitoring();
+
+ /**
+ * Returns true if monitoring is currently in progress.
+ */
+ public abstract boolean isMonitoring();
+
+ /**
+ * Stops monitoring and also frees any buffered data, returning this
+ * object to its initial state.
+ */
+ public abstract void close();
+
+ /**
+ * Creates a new MetricsRecord instance with the given <code>recordName</code>.
+ * Throws an exception if the metrics implementation is configured with a fixed
+ * set of record names and <code>recordName</code> is not in that set.
+ *
+ * @param recordName the name of the record
+ * @throws MetricsException if recordName conflicts with configuration data
+ */
+ public abstract MetricsRecord createRecord(String recordName);
+
+ /**
+ * Registers a callback to be called at regular time intervals, as
+ * determined by the implementation-class specific configuration.
+ *
+ * @param updater object to be run periodically; it should updated
+ * some metrics records and then return
+ */
+ public abstract void registerUpdater(Updater updater);
- /**
- * Removes a callback, if it exists.
- *
- * @param updater object to be removed from the callback list
- */
- public abstract void unregisterUpdater(Updater updater);
+ /**
+ * Removes a callback, if it exists.
+ *
+ * @param updater object to be removed from the callback list
+ */
+ public abstract void unregisterUpdater(Updater updater);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsException.java Mon Apr 16 14:44:35 2007
@@ -25,18 +25,18 @@
*/
public class MetricsException extends RuntimeException {
- private static final long serialVersionUID = -1643257498540498497L;
+ private static final long serialVersionUID = -1643257498540498497L;
- /** Creates a new instance of MetricsException */
- public MetricsException() {
- }
+ /** Creates a new instance of MetricsException */
+ public MetricsException() {
+ }
- /** Creates a new instance of MetricsException
- *
- * @param message an error message
- */
- public MetricsException(String message) {
- super(message);
- }
+ /** Creates a new instance of MetricsException
+ *
+ * @param message an error message
+ */
+ public MetricsException(String message) {
+ super(message);
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java Mon Apr 16 14:44:35 2007
@@ -67,140 +67,140 @@
*/
public interface MetricsRecord {
- /**
- * Returns the record name.
- *
- * @return the record name
- */
- public abstract String getRecordName();
-
- /**
- * Sets the named tag to the specified value.
- *
- * @param tagName name of the tag
- * @param tagValue new value of the tag
- * @throws MetricsException if the tagName conflicts with the configuration
- */
- public abstract void setTag(String tagName, String tagValue);
-
- /**
- * Sets the named tag to the specified value.
- *
- * @param tagName name of the tag
- * @param tagValue new value of the tag
- * @throws MetricsException if the tagName conflicts with the configuration
- */
- public abstract void setTag(String tagName, int tagValue);
-
- /**
- * Sets the named tag to the specified value.
- *
- * @param tagName name of the tag
- * @param tagValue new value of the tag
- * @throws MetricsException if the tagName conflicts with the configuration
- */
- public abstract void setTag(String tagName, short tagValue);
-
- /**
- * Sets the named tag to the specified value.
- *
- * @param tagName name of the tag
- * @param tagValue new value of the tag
- * @throws MetricsException if the tagName conflicts with the configuration
- */
- public abstract void setTag(String tagName, byte tagValue);
-
- /**
- * Sets the named metric to the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue new value of the metric
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void setMetric(String metricName, int metricValue);
-
- /**
- * Sets the named metric to the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue new value of the metric
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void setMetric(String metricName, short metricValue);
-
- /**
- * Sets the named metric to the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue new value of the metric
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void setMetric(String metricName, byte metricValue);
-
- /**
- * Sets the named metric to the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue new value of the metric
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void setMetric(String metricName, float metricValue);
-
- /**
- * Increments the named metric by the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue incremental value
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void incrMetric(String metricName, int metricValue);
-
- /**
- * Increments the named metric by the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue incremental value
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void incrMetric(String metricName, short metricValue);
-
- /**
- * Increments the named metric by the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue incremental value
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void incrMetric(String metricName, byte metricValue);
-
- /**
- * Increments the named metric by the specified value.
- *
- * @param metricName name of the metric
- * @param metricValue incremental value
- * @throws MetricsException if the metricName or the type of the metricValue
- * conflicts with the configuration
- */
- public abstract void incrMetric(String metricName, float metricValue);
-
- /**
- * Updates the table of buffered data which is to be sent periodically.
- * If the tag values match an existing row, that row is updated;
- * otherwise, a new row is added.
- */
- public abstract void update();
-
- /**
- * Removes, from the buffered data table, the row (if it exists) having tags
- * that equal the tags that have been set on this record.
- */
- public abstract void remove();
+ /**
+ * Returns the record name.
+ *
+ * @return the record name
+ */
+ public abstract String getRecordName();
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public abstract void setTag(String tagName, String tagValue);
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public abstract void setTag(String tagName, int tagValue);
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public abstract void setTag(String tagName, short tagValue);
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public abstract void setTag(String tagName, byte tagValue);
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void setMetric(String metricName, int metricValue);
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void setMetric(String metricName, short metricValue);
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void setMetric(String metricName, byte metricValue);
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void setMetric(String metricName, float metricValue);
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void incrMetric(String metricName, int metricValue);
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void incrMetric(String metricName, short metricValue);
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void incrMetric(String metricName, byte metricValue);
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public abstract void incrMetric(String metricName, float metricValue);
+
+ /**
+ * Updates the table of buffered data which is to be sent periodically.
+ * If the tag values match an existing row, that row is updated;
+ * otherwise, a new row is added.
+ */
+ public abstract void update();
+
+ /**
+ * Removes, from the buffered data table, the row (if it exists) having tags
+ * that equal the tags that have been set on this record.
+ */
+ public abstract void remove();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Updater.java Mon Apr 16 14:44:35 2007
@@ -25,9 +25,9 @@
*/
public interface Updater {
- /**
- * Timer-based call-back from the metric library.
- */
- public abstract void doUpdates(MetricsContext context);
+ /**
+ * Timer-based call-back from the metric library.
+ */
+ public abstract void doUpdates(MetricsContext context);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/file/FileContext.java Mon Apr 16 14:44:35 2007
@@ -45,108 +45,108 @@
*/
public class FileContext extends AbstractMetricsContext {
- /* Configuration attribute names */
- protected static final String FILE_NAME_PROPERTY = "fileName";
- protected static final String PERIOD_PROPERTY = "period";
+ /* Configuration attribute names */
+ protected static final String FILE_NAME_PROPERTY = "fileName";
+ protected static final String PERIOD_PROPERTY = "period";
- private File file = null; // file for metrics to be written to
- private PrintWriter writer = null;
+ private File file = null; // file for metrics to be written to
+ private PrintWriter writer = null;
- /** Creates a new instance of FileContext */
- public FileContext() {}
+ /** Creates a new instance of FileContext */
+ public FileContext() {}
- public void init(String contextName, ContextFactory factory) {
- super.init(contextName, factory);
+ public void init(String contextName, ContextFactory factory) {
+ super.init(contextName, factory);
- String fileName = getAttribute(FILE_NAME_PROPERTY);
- if (fileName != null) {
- file = new File(fileName);
- }
+ String fileName = getAttribute(FILE_NAME_PROPERTY);
+ if (fileName != null) {
+ file = new File(fileName);
+ }
- String periodStr = getAttribute(PERIOD_PROPERTY);
- if (periodStr != null) {
- int period = 0;
- try {
- period = Integer.parseInt(periodStr);
- } catch (NumberFormatException nfe) {
- }
- if (period <= 0) {
- throw new MetricsException("Invalid period: " + periodStr);
- }
- setPeriod(period);
- }
+ String periodStr = getAttribute(PERIOD_PROPERTY);
+ if (periodStr != null) {
+ int period = 0;
+ try {
+ period = Integer.parseInt(periodStr);
+ } catch (NumberFormatException nfe) {
+ }
+ if (period <= 0) {
+ throw new MetricsException("Invalid period: " + periodStr);
+ }
+ setPeriod(period);
}
+ }
- /**
- * Returns the configured file name, or null.
- */
- public String getFileName() {
- if (file == null) {
- return null;
- } else {
- return file.getName();
- }
- }
-
- /**
- * Starts or restarts monitoring, by opening in append-mode, the
- * file specified by the <code>fileName</code> attribute,
- * if specified. Otherwise the data will be written to standard
- * output.
- */
- public void startMonitoring()
- throws IOException
- {
- if (file == null) {
- writer = new PrintWriter(new BufferedOutputStream(System.out));
- } else {
- writer = new PrintWriter(new FileWriter(file, true));
- }
- super.startMonitoring();
- }
-
- /**
- * Stops monitoring, closing the file.
- * @see #close()
- */
- public void stopMonitoring() {
- super.stopMonitoring();
+ /**
+ * Returns the configured file name, or null.
+ */
+ public String getFileName() {
+ if (file == null) {
+ return null;
+ } else {
+ return file.getName();
+ }
+ }
+
+ /**
+ * Starts or restarts monitoring, by opening in append-mode, the
+ * file specified by the <code>fileName</code> attribute,
+ * if specified. Otherwise the data will be written to standard
+ * output.
+ */
+ public void startMonitoring()
+ throws IOException
+ {
+ if (file == null) {
+ writer = new PrintWriter(new BufferedOutputStream(System.out));
+ } else {
+ writer = new PrintWriter(new FileWriter(file, true));
+ }
+ super.startMonitoring();
+ }
+
+ /**
+ * Stops monitoring, closing the file.
+ * @see #close()
+ */
+ public void stopMonitoring() {
+ super.stopMonitoring();
- if (writer != null) {
- writer.close();
- writer = null;
- }
- }
-
- /**
- * Emits a metrics record to a file.
- */
- public void emitRecord(String contextName, String recordName, OutputRecord outRec) {
- writer.print(contextName);
- writer.print(".");
- writer.print(recordName);
- String separator = ": ";
- for (String tagName : outRec.getTagNames()) {
- writer.print(separator);
- separator = ", ";
- writer.print(tagName);
- writer.print("=");
- writer.print(outRec.getTag(tagName));
- }
- for (String metricName : outRec.getMetricNames()) {
- writer.print(separator);
- separator = ", ";
- writer.print(metricName);
- writer.print("=");
- writer.print(outRec.getMetric(metricName));
- }
- writer.println();
- }
-
- /**
- * Flushes the output writer, forcing updates to disk.
- */
- public void flush() {
- writer.flush();
- }
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ }
+
+ /**
+ * Emits a metrics record to a file.
+ */
+ public void emitRecord(String contextName, String recordName, OutputRecord outRec) {
+ writer.print(contextName);
+ writer.print(".");
+ writer.print(recordName);
+ String separator = ": ";
+ for (String tagName : outRec.getTagNames()) {
+ writer.print(separator);
+ separator = ", ";
+ writer.print(tagName);
+ writer.print("=");
+ writer.print(outRec.getTag(tagName));
+ }
+ for (String metricName : outRec.getMetricNames()) {
+ writer.print(separator);
+ separator = ", ";
+ writer.print(metricName);
+ writer.print("=");
+ writer.print(outRec.getMetric(metricName));
+ }
+ writer.println();
+ }
+
+ /**
+ * Flushes the output writer, forcing updates to disk.
+ */
+ public void flush() {
+ writer.flush();
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java Mon Apr 16 14:44:35 2007
@@ -41,183 +41,183 @@
*/
public class GangliaContext extends AbstractMetricsContext {
- private static final String PERIOD_PROPERTY = "period";
- private static final String SERVERS_PROPERTY = "servers";
- private static final String UNITS_PROPERTY = "units";
- private static final String SLOPE_PROPERTY = "slope";
- private static final String TMAX_PROPERTY = "tmax";
- private static final String DMAX_PROPERTY = "dmax";
-
- private static final String DEFAULT_UNITS = "";
- private static final String DEFAULT_SLOPE = "both";
- private static final int DEFAULT_TMAX = 60;
- private static final int DEFAULT_DMAX = 0;
- private static final int DEFAULT_PORT = 8649;
- private static final int BUFFER_SIZE = 1500; // as per libgmond.c
-
- private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
-
- static {
- typeTable.put(String.class, "string");
- typeTable.put(Byte.class, "int8");
- typeTable.put(Short.class, "int16");
- typeTable.put(Integer.class, "int32");
- typeTable.put(Float.class, "float");
- }
-
- private byte[] buffer = new byte[BUFFER_SIZE];
- private int offset;
-
- private List<? extends SocketAddress> metricsServers;
- private Map<String,String> unitsTable;
- private Map<String,String> slopeTable;
- private Map<String,String> tmaxTable;
- private Map<String,String> dmaxTable;
-
- private DatagramSocket datagramSocket;
-
- /** Creates a new instance of GangliaContext */
- public GangliaContext() {
- }
-
- public void init(String contextName, ContextFactory factory)
- {
- super.init(contextName, factory);
-
- String periodStr = getAttribute(PERIOD_PROPERTY);
- if (periodStr != null) {
- int period = 0;
- try {
- period = Integer.parseInt(periodStr);
- } catch (NumberFormatException nfe) {
- }
- if (period <= 0) {
- throw new MetricsException("Invalid period: " + periodStr);
- }
- setPeriod(period);
- }
-
- metricsServers =
- Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT);
-
- unitsTable = getAttributeTable(UNITS_PROPERTY);
- slopeTable = getAttributeTable(SLOPE_PROPERTY);
- tmaxTable = getAttributeTable(TMAX_PROPERTY);
- dmaxTable = getAttributeTable(DMAX_PROPERTY);
-
- try {
- datagramSocket = new DatagramSocket();
- }
- catch (SocketException se) {
- se.printStackTrace();
- }
- }
-
- public void emitRecord(String contextName, String recordName, OutputRecord outRec)
- throws IOException
- {
- // emit each metric in turn
- for (String metricName : outRec.getMetricNames()) {
- Object metric = outRec.getMetric(metricName);
- String type = (String) typeTable.get(metric.getClass());
- emitMetric(metricName, type, metric.toString());
- }
-
- }
-
- private void emitMetric(String name, String type, String value)
- throws IOException
- {
- String units = getUnits(name);
- int slope = getSlope(name);
- int tmax = getTmax(name);
- int dmax = getDmax(name);
-
- offset = 0;
- xdr_int(0); // metric_user_defined
- xdr_string(type);
- xdr_string(name);
- xdr_string(value);
- xdr_string(units);
- xdr_int(slope);
- xdr_int(tmax);
- xdr_int(dmax);
-
- for (SocketAddress socketAddress : metricsServers) {
- DatagramPacket packet =
- new DatagramPacket(buffer, offset, socketAddress);
- datagramSocket.send(packet);
- }
- }
-
- private String getUnits(String metricName) {
- String result = (String) unitsTable.get(metricName);
- if (result == null) {
- result = DEFAULT_UNITS;
- }
- return result;
- }
-
- private int getSlope(String metricName) {
- String slopeString = (String) slopeTable.get(metricName);
- if (slopeString == null) {
- slopeString = DEFAULT_SLOPE;
- }
- return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
- }
-
- private int getTmax(String metricName) {
- String tmaxString = (String) tmaxTable.get(metricName);
- if (tmaxString == null) {
- return DEFAULT_TMAX;
- }
- else {
- return Integer.parseInt(tmaxString);
- }
- }
-
- private int getDmax(String metricName) {
- String dmaxString = (String) dmaxTable.get(metricName);
- if (dmaxString == null) {
- return DEFAULT_DMAX;
- }
- else {
- return Integer.parseInt(dmaxString);
- }
- }
-
- /**
- * Puts a string into the buffer by first writing the size of the string
- * as an int, followed by the bytes of the string, padded if necessary to
- * a multiple of 4.
- */
- private void xdr_string(String s) {
- byte[] bytes = s.getBytes();
- int len = bytes.length;
- xdr_int(len);
- System.arraycopy(bytes, 0, buffer, offset, len);
- offset += len;
- pad();
- }
+ private static final String PERIOD_PROPERTY = "period";
+ private static final String SERVERS_PROPERTY = "servers";
+ private static final String UNITS_PROPERTY = "units";
+ private static final String SLOPE_PROPERTY = "slope";
+ private static final String TMAX_PROPERTY = "tmax";
+ private static final String DMAX_PROPERTY = "dmax";
+
+ private static final String DEFAULT_UNITS = "";
+ private static final String DEFAULT_SLOPE = "both";
+ private static final int DEFAULT_TMAX = 60;
+ private static final int DEFAULT_DMAX = 0;
+ private static final int DEFAULT_PORT = 8649;
+ private static final int BUFFER_SIZE = 1500; // as per libgmond.c
+
+ private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
+
+ static {
+ typeTable.put(String.class, "string");
+ typeTable.put(Byte.class, "int8");
+ typeTable.put(Short.class, "int16");
+ typeTable.put(Integer.class, "int32");
+ typeTable.put(Float.class, "float");
+ }
+
+ private byte[] buffer = new byte[BUFFER_SIZE];
+ private int offset;
+
+ private List<? extends SocketAddress> metricsServers;
+ private Map<String,String> unitsTable;
+ private Map<String,String> slopeTable;
+ private Map<String,String> tmaxTable;
+ private Map<String,String> dmaxTable;
+
+ private DatagramSocket datagramSocket;
+
+ /** Creates a new instance of GangliaContext */
+ public GangliaContext() {
+ }
+
+ public void init(String contextName, ContextFactory factory)
+ {
+ super.init(contextName, factory);
+
+ String periodStr = getAttribute(PERIOD_PROPERTY);
+ if (periodStr != null) {
+ int period = 0;
+ try {
+ period = Integer.parseInt(periodStr);
+ } catch (NumberFormatException nfe) {
+ }
+ if (period <= 0) {
+ throw new MetricsException("Invalid period: " + periodStr);
+ }
+ setPeriod(period);
+ }
+
+ metricsServers =
+ Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT);
+
+ unitsTable = getAttributeTable(UNITS_PROPERTY);
+ slopeTable = getAttributeTable(SLOPE_PROPERTY);
+ tmaxTable = getAttributeTable(TMAX_PROPERTY);
+ dmaxTable = getAttributeTable(DMAX_PROPERTY);
+
+ try {
+ datagramSocket = new DatagramSocket();
+ }
+ catch (SocketException se) {
+ se.printStackTrace();
+ }
+ }
+
+ public void emitRecord(String contextName, String recordName, OutputRecord outRec)
+ throws IOException
+ {
+ // emit each metric in turn
+ for (String metricName : outRec.getMetricNames()) {
+ Object metric = outRec.getMetric(metricName);
+ String type = (String) typeTable.get(metric.getClass());
+ emitMetric(metricName, type, metric.toString());
+ }
+
+ }
+
+ private void emitMetric(String name, String type, String value)
+ throws IOException
+ {
+ String units = getUnits(name);
+ int slope = getSlope(name);
+ int tmax = getTmax(name);
+ int dmax = getDmax(name);
+
+ offset = 0;
+ xdr_int(0); // metric_user_defined
+ xdr_string(type);
+ xdr_string(name);
+ xdr_string(value);
+ xdr_string(units);
+ xdr_int(slope);
+ xdr_int(tmax);
+ xdr_int(dmax);
+
+ for (SocketAddress socketAddress : metricsServers) {
+ DatagramPacket packet =
+ new DatagramPacket(buffer, offset, socketAddress);
+ datagramSocket.send(packet);
+ }
+ }
+
+ private String getUnits(String metricName) {
+ String result = (String) unitsTable.get(metricName);
+ if (result == null) {
+ result = DEFAULT_UNITS;
+ }
+ return result;
+ }
+
+ private int getSlope(String metricName) {
+ String slopeString = (String) slopeTable.get(metricName);
+ if (slopeString == null) {
+ slopeString = DEFAULT_SLOPE;
+ }
+ return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
+ }
+
+ private int getTmax(String metricName) {
+ String tmaxString = (String) tmaxTable.get(metricName);
+ if (tmaxString == null) {
+ return DEFAULT_TMAX;
+ }
+ else {
+ return Integer.parseInt(tmaxString);
+ }
+ }
+
+ private int getDmax(String metricName) {
+ String dmaxString = (String) dmaxTable.get(metricName);
+ if (dmaxString == null) {
+ return DEFAULT_DMAX;
+ }
+ else {
+ return Integer.parseInt(dmaxString);
+ }
+ }
+
+ /**
+ * Puts a string into the buffer by first writing the size of the string
+ * as an int, followed by the bytes of the string, padded if necessary to
+ * a multiple of 4.
+ */
+ private void xdr_string(String s) {
+ byte[] bytes = s.getBytes();
+ int len = bytes.length;
+ xdr_int(len);
+ System.arraycopy(bytes, 0, buffer, offset, len);
+ offset += len;
+ pad();
+ }
- /**
- * Pads the buffer with zero bytes up to the nearest multiple of 4.
- */
- private void pad() {
- int newOffset = ((offset + 3) / 4) * 4;
- while (offset < newOffset) {
- buffer[offset++] = 0;
- }
- }
-
- /**
- * Puts an integer into the buffer as 4 bytes, big-endian.
- */
- private void xdr_int(int i) {
- buffer[offset++] = (byte)((i >> 24) & 0xff);
- buffer[offset++] = (byte)((i >> 16) & 0xff);
- buffer[offset++] = (byte)((i >> 8) & 0xff);
- buffer[offset++] = (byte)(i & 0xff);
- }
+ /**
+ * Pads the buffer with zero bytes up to the nearest multiple of 4.
+ */
+ private void pad() {
+ int newOffset = ((offset + 3) / 4) * 4;
+ while (offset < newOffset) {
+ buffer[offset++] = 0;
+ }
+ }
+
+ /**
+ * Puts an integer into the buffer as 4 bytes, big-endian.
+ */
+ private void xdr_int(int i) {
+ buffer[offset++] = (byte)((i >> 24) & 0xff);
+ buffer[offset++] = (byte)((i >> 16) & 0xff);
+ buffer[offset++] = (byte)((i >> 8) & 0xff);
+ buffer[offset++] = (byte)(i & 0xff);
+ }
}