You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [14/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoo...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,133 +18,116 @@
 
 package org.apache.hadoop.chukwa.inputtools.plugin;
 
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-
 import org.json.JSONObject;
 
-public abstract class ExecPlugin implements IPlugin
-{
-	public final int statusOK = 100;
-	public final int statusKO = -100;
-	
-	Process process;
-	
-	public ExecPlugin()
-	{
-		
-	}
-	
-	public void stop() {
-	  process.destroy();
-	}
-	
-	public int waitFor() throws InterruptedException {
-	  return process.waitFor();
-	}
-	
-	public abstract String getCmde();
-	
-	public JSONObject postProcess(JSONObject execResult)
-	{
-		return execResult;
-	}
-	
-	public JSONObject execute()
-	{
-		JSONObject result = new JSONObject();
-		try
-		{
-			result.put("timestamp", System.currentTimeMillis());
-			
-			Runtime runtime = Runtime.getRuntime();
-			process = runtime.exec(getCmde());
-//			ProcessBuilder builder = new ProcessBuilder(cmde);
-//			Process process = builder.start();
-			
-
-			
-			
-			OutputReader stdOut = new OutputReader(process,Output.stdOut);
-			stdOut.start();
-			OutputReader stdErr = new OutputReader(process,Output.stdErr);
-			stdErr.start();
-		    int exitValue =process.waitFor();
-		    stdOut.join();
-		    stdErr.join();
-		    result.put("exitValue", exitValue);
-		    result.put("stdout", stdOut.output.toString());
-		    result.put("stderr", stdErr.output.toString());
-		    result.put("status", statusOK);
-		}
-		catch (Throwable e)
-		{
-			try 
-			{
-				result.put("status", statusKO);
-				result.put("errorLog", e.getMessage());
-			}
-			catch(Exception e1) { e1.printStackTrace();}
-			e.printStackTrace();
-		}
+public abstract class ExecPlugin implements IPlugin {
+  public final int statusOK = 100;
+  public final int statusKO = -100;
+
+  Process process;
+
+  public ExecPlugin() {
+
+  }
+
+  public void stop() {
+    process.destroy();
+  }
+
+  public int waitFor() throws InterruptedException {
+    return process.waitFor();
+  }
+
+  public abstract String getCmde();
+
+  public JSONObject postProcess(JSONObject execResult) {
+    return execResult;
+  }
+
+  public JSONObject execute() {
+    JSONObject result = new JSONObject();
+    try {
+      result.put("timestamp", System.currentTimeMillis());
+
+      Runtime runtime = Runtime.getRuntime();
+      process = runtime.exec(getCmde());
+      // ProcessBuilder builder = new ProcessBuilder(cmde);
+      // Process process = builder.start();
+
+      OutputReader stdOut = new OutputReader(process, Output.stdOut);
+      stdOut.start();
+      OutputReader stdErr = new OutputReader(process, Output.stdErr);
+      stdErr.start();
+      int exitValue = process.waitFor();
+      stdOut.join();
+      stdErr.join();
+      result.put("exitValue", exitValue);
+      result.put("stdout", stdOut.output.toString());
+      result.put("stderr", stdErr.output.toString());
+      result.put("status", statusOK);
+    } catch (Throwable e) {
+      try {
+        result.put("status", statusKO);
+        result.put("errorLog", e.getMessage());
+      } catch (Exception e1) {
+        e1.printStackTrace();
+      }
+      e.printStackTrace();
+    }
 
-		return postProcess(result);
-	}
+    return postProcess(result);
+  }
 }
 
 
-enum Output{stdOut,stdErr};
-
-class OutputReader extends Thread
-{
-	private Process process = null;
-	private Output outputType = null;
-	public StringBuilder output = new StringBuilder();
-	public boolean isOk = true;
-
-
-	public OutputReader(Process process,Output outputType)
-	{
-		this.process = process;
-		this.outputType = outputType;
-	}
-	public void run()
-	{
-	   try
-		{
-		    String line = null;
-		    InputStream is = null;
-		    switch(this.outputType)
-		    {
-		    case stdOut:
-		    	is = process.getInputStream();
-		    	break;
-		    case stdErr:
-		    	is = process.getErrorStream();
-		    	break;
-		    	
-		    }
-		   
-		    InputStreamReader isr = new InputStreamReader(is);
-		    BufferedReader br = new BufferedReader(isr);
-		    while ((line = br.readLine()) != null) 
-		    {
-		    	 //System.out.println("========>>>>>>>["+line+"]");	
-		    	 output.append(line).append("\n");
-		    }
-        br.close();
-		}
-		catch (IOException e)
-		{
-			isOk = false;
-			e.printStackTrace();
-		}
-		catch (Throwable e)
-		{
-			isOk = false;
-			e.printStackTrace();
-		}
-	}
+enum Output {
+  stdOut, stdErr
+};
+
+
+class OutputReader extends Thread {
+  private Process process = null;
+  private Output outputType = null;
+  public StringBuilder output = new StringBuilder();
+  public boolean isOk = true;
+
+  public OutputReader(Process process, Output outputType) {
+    this.process = process;
+    this.outputType = outputType;
+  }
+
+  public void run() {
+    try {
+      String line = null;
+      InputStream is = null;
+      switch (this.outputType) {
+      case stdOut:
+        is = process.getInputStream();
+        break;
+      case stdErr:
+        is = process.getErrorStream();
+        break;
+
+      }
+
+      InputStreamReader isr = new InputStreamReader(is);
+      BufferedReader br = new BufferedReader(isr);
+      while ((line = br.readLine()) != null) {
+        // System.out.println("========>>>>>>>["+line+"]");
+        output.append(line).append("\n");
+      }
+      br.close();
+    } catch (IOException e) {
+      isOk = false;
+      e.printStackTrace();
+    } catch (Throwable e) {
+      isOk = false;
+      e.printStackTrace();
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.chukwa.inputtools.plugin;
 
+
 import org.json.JSONObject;
 
-public interface IPlugin
-{
-	JSONObject execute();
+public interface IPlugin {
+  JSONObject execute();
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java Wed Mar 11 22:39:26 2009
@@ -1,10 +1,10 @@
 package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
 
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
@@ -14,54 +14,58 @@
 import org.json.JSONObject;
 
 public class Exec extends TimerTask {
-	private static Log log = LogFactory.getLog(Exec.class);
-	private String cmde = null;
-    private static PidFile pFile = null;
-    private Timer timer = null;
-    private IPlugin plugin = null;
-    
-	public Exec(String[] cmds) {
-		StringBuffer c = new StringBuffer();
-		for(String cmd : cmds) {
-			c.append(cmd);
-			c.append(" ");
-		}
-		cmde = c.toString();
-		plugin = new ExecHelper(cmds);
-	}
-	public void run() {
-		try {
-			JSONObject result = plugin.execute();
-			if (result.getInt("status") < 0) {
-				System.out.println("Error");
-				log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
-				System.exit(-1);
-			} else {
-				log.info(result.get("stdout"));
-			}
-		} catch(JSONException e) {
-			log.error("Exec output unparsable:"+this.cmde);
-		}
-	}
-	public String getCmde() {
-		return cmde;
-	}
-    
-	public static void main(String[] args) {
-   	    pFile=new PidFile(System.getProperty("RECORD_TYPE")+"-data-loader");
-   	    Runtime.getRuntime().addShutdownHook(pFile);
-   	    int period = 60;
-   	    try {
-			if(System.getProperty("PERIOD")!=null) {
-			    period = Integer.parseInt(System.getProperty("PERIOD"));
-			}
-        } catch(NumberFormatException ex) {
-			ex.printStackTrace();
-			System.out.println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
-			System.out.println("PERIOD should be numeric format of seconds.");        	
-			System.exit(0);
-        }
-   	    Timer timer = new Timer();
-		timer.schedule(new Exec(args),0, period*1000);
-	}
+  private static Log log = LogFactory.getLog(Exec.class);
+  private String cmde = null;
+  private static PidFile pFile = null;
+  private Timer timer = null;
+  private IPlugin plugin = null;
+
+  public Exec(String[] cmds) {
+    StringBuffer c = new StringBuffer();
+    for (String cmd : cmds) {
+      c.append(cmd);
+      c.append(" ");
+    }
+    cmde = c.toString();
+    plugin = new ExecHelper(cmds);
+  }
+
+  public void run() {
+    try {
+      JSONObject result = plugin.execute();
+      if (result.getInt("status") < 0) {
+        System.out.println("Error");
+        log.warn("[ChukwaError]:" + Exec.class + ", "
+            + result.getString("stderr"));
+        System.exit(-1);
+      } else {
+        log.info(result.get("stdout"));
+      }
+    } catch (JSONException e) {
+      log.error("Exec output unparsable:" + this.cmde);
+    }
+  }
+
+  public String getCmde() {
+    return cmde;
+  }
+
+  public static void main(String[] args) {
+    pFile = new PidFile(System.getProperty("RECORD_TYPE") + "-data-loader");
+    Runtime.getRuntime().addShutdownHook(pFile);
+    int period = 60;
+    try {
+      if (System.getProperty("PERIOD") != null) {
+        period = Integer.parseInt(System.getProperty("PERIOD"));
+      }
+    } catch (NumberFormatException ex) {
+      ex.printStackTrace();
+      System.out
+          .println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
+      System.out.println("PERIOD should be numeric format of seconds.");
+      System.exit(0);
+    }
+    Timer timer = new Timer();
+    timer.schedule(new Exec(args), 0, period * 1000);
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java Wed Mar 11 22:39:26 2009
@@ -1,10 +1,10 @@
 package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
 
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
@@ -14,21 +14,21 @@
 import org.json.JSONObject;
 
 public class ExecHelper extends ExecPlugin {
-	private static Log log = LogFactory.getLog(ExecHelper.class);
-	private String cmde = null;
-    private static PidFile pFile = null;
-    private Timer timer = null;
-    
-	public ExecHelper(String[] cmds) {
-		StringBuffer c = new StringBuffer();
-		for(String cmd : cmds) {
-			c.append(cmd);
-			c.append(" ");
-		}
-		cmde = c.toString();
-	}
-	
-	public String getCmde() {
-		return cmde;
-	}    
+  private static Log log = LogFactory.getLog(ExecHelper.class);
+  private String cmde = null;
+  private static PidFile pFile = null;
+  private Timer timer = null;
+
+  public ExecHelper(String[] cmds) {
+    StringBuffer c = new StringBuffer();
+    for (String cmd : cmds) {
+      c.append(cmd);
+      c.append(" ");
+    }
+    cmde = c.toString();
+  }
+
+  public String getCmde() {
+    return cmde;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,105 +18,85 @@
 
 package org.apache.hadoop.chukwa.inputtools.plugin.nodeactivity;
 
+
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
 import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
 import org.json.JSONObject;
 
-public class NodeActivityPlugin extends ExecPlugin
-{
-	private String cmde = null;
-	private DataConfig dataConfig = null;
-	
-	public NodeActivityPlugin()
-	{
-		dataConfig = new DataConfig();
-		cmde = dataConfig.get("mdl.plugin.NodeActivityPlugin.cmde");
-	}
-	
-	@Override
-	public String getCmde()
-	{
-		return cmde;
-	}
-	
-	@Override
-	public JSONObject postProcess(JSONObject execResult)
-	{
-		try
-		{
-			if (execResult.getInt("status") < 0)
-			{
-				return execResult;
-			}
-			
-			String res = execResult.getString("stdout");
-			
-			String[] tab = res.split("\n");
-			int totalFreeNode = 0;
-			int totalUsedNode = 0;
-			int totalDownNode = 0;
-			
-			for(int i=0;i<tab.length;i++)
-			{
-				if (tab[i].indexOf("state =") <0)
-				{
-					tab[i] = null;
-					continue;
-				}
-	
-				String[] line = tab[i].split("state =");
-				tab[i] = null;
-				
-				if (line[1].trim().equals("free"))
-				{
-					totalFreeNode ++;
-				}
-				else if (line[1].trim().equals("job-exclusive"))
-				{
-					totalUsedNode ++;
-				}
-				else
-				{
-					totalDownNode ++;
-				}
-			}
-			
-
-			execResult.put("totalFreeNode", totalFreeNode);
-			execResult.put("totalUsedNode", totalUsedNode);
-			execResult.put("totalDownNode", totalDownNode);
-			execResult.put("source", "NodeActivity");
-			
-			execResult.put("status", 100);	
-			
-		} catch (Throwable e)
-		{
-			try
-			{
-				execResult.put("source", "NodeActivity");
-				execResult.put("status", -100);	
-				execResult.put("errorLog",e.getMessage());
-			}
-			catch(Exception e1) { e1.printStackTrace();}
-			e.printStackTrace();
-			
-		}
-		
-		return execResult;
-	}
-
-	public static void main(String[] args)
-	{
-		IPlugin plugin = new NodeActivityPlugin();
-		JSONObject result = plugin.execute();
-		System.out.print("Result: " + result);
-		
-		
-		
-		
-	}
-
+public class NodeActivityPlugin extends ExecPlugin {
+  private String cmde = null;
+  private DataConfig dataConfig = null;
+
+  public NodeActivityPlugin() {
+    dataConfig = new DataConfig();
+    cmde = dataConfig.get("mdl.plugin.NodeActivityPlugin.cmde");
+  }
+
+  @Override
+  public String getCmde() {
+    return cmde;
+  }
+
+  @Override
+  public JSONObject postProcess(JSONObject execResult) {
+    try {
+      if (execResult.getInt("status") < 0) {
+        return execResult;
+      }
+
+      String res = execResult.getString("stdout");
+
+      String[] tab = res.split("\n");
+      int totalFreeNode = 0;
+      int totalUsedNode = 0;
+      int totalDownNode = 0;
+
+      for (int i = 0; i < tab.length; i++) {
+        if (tab[i].indexOf("state =") < 0) {
+          tab[i] = null;
+          continue;
+        }
+
+        String[] line = tab[i].split("state =");
+        tab[i] = null;
+
+        if (line[1].trim().equals("free")) {
+          totalFreeNode++;
+        } else if (line[1].trim().equals("job-exclusive")) {
+          totalUsedNode++;
+        } else {
+          totalDownNode++;
+        }
+      }
+
+      execResult.put("totalFreeNode", totalFreeNode);
+      execResult.put("totalUsedNode", totalUsedNode);
+      execResult.put("totalDownNode", totalDownNode);
+      execResult.put("source", "NodeActivity");
+
+      execResult.put("status", 100);
+
+    } catch (Throwable e) {
+      try {
+        execResult.put("source", "NodeActivity");
+        execResult.put("status", -100);
+        execResult.put("errorLog", e.getMessage());
+      } catch (Exception e1) {
+        e1.printStackTrace();
+      }
+      e.printStackTrace();
+
+    }
+
+    return execResult;
+  }
+
+  public static void main(String[] args) {
+    IPlugin plugin = new NodeActivityPlugin();
+    JSONObject result = plugin.execute();
+    System.out.print("Result: " + result);
 
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java Wed Mar 11 22:39:26 2009
@@ -1,5 +1,6 @@
 package org.apache.hadoop.chukwa.inputtools.plugin.pbsnode;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
@@ -8,38 +9,32 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
-public class PbsNodePlugin extends ExecPlugin
-{
-	private static Log log = LogFactory.getLog(PbsNodePlugin.class);
-	private String cmde = null;
-	private DataConfig dataConfig = null;
-	
-	public PbsNodePlugin()
-	{
-		dataConfig = new DataConfig();
-		cmde = dataConfig.get("chukwa.inputtools.plugin.pbsNode.cmde");
-	}
-	
-	@Override
-	public String getCmde()
-	{
-		return cmde;
-	}
-
-	public static void main(String[] args) throws JSONException
-	{
-		IPlugin plugin = new PbsNodePlugin();
-		JSONObject result = plugin.execute();
-		System.out.print("Result: " + result);	
-		
-		if (result.getInt("status") < 0)
-		{
-			System.out.println("Error");
-			log.warn("[ChukwaError]:"+ PbsNodePlugin.class + ", " + result.getString("stderr"));
-		}
-		else
-		{
-			log.info(result.get("stdout"));
-		}
-	}
+public class PbsNodePlugin extends ExecPlugin {
+  private static Log log = LogFactory.getLog(PbsNodePlugin.class);
+  private String cmde = null;
+  private DataConfig dataConfig = null;
+
+  public PbsNodePlugin() {
+    dataConfig = new DataConfig();
+    cmde = dataConfig.get("chukwa.inputtools.plugin.pbsNode.cmde");
+  }
+
+  @Override
+  public String getCmde() {
+    return cmde;
+  }
+
+  public static void main(String[] args) throws JSONException {
+    IPlugin plugin = new PbsNodePlugin();
+    JSONObject result = plugin.execute();
+    System.out.print("Result: " + result);
+
+    if (result.getInt("status") < 0) {
+      System.out.println("Error");
+      log.warn("[ChukwaError]:" + PbsNodePlugin.class + ", "
+          + result.getString("stderr"));
+    } else {
+      log.info(result.get("stdout"));
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java Wed Mar 11 22:39:26 2009
@@ -18,61 +18,61 @@
 
 package org.apache.hadoop.chukwa.util;
 
+
 import java.io.*;
 import java.util.*;
 
 public class ClusterConfig {
-    public static HashMap<String, String> clusterMap = new HashMap<String, String>();
-    private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
-    static public String getContents(File aFile) {
-        //...checks on aFile are elided
-        StringBuffer contents = new StringBuffer();
-   
-        try {
-          //use buffering, reading one line at a time
-          //FileReader always assumes default encoding is OK!
-          BufferedReader input =  new BufferedReader(new FileReader(aFile));
-          try {
-             String line = null; //not declared within while loop
-             /*
-              * readLine is a bit quirky :
-              * it returns the content of a line MINUS the newline.
-              * it returns null only for the END of the stream.
-              * it returns an empty String if two newlines appear in a row.
-              */
-             while (( line = input.readLine()) != null){
-                contents.append(line);
-                contents.append(System.getProperty("line.separator"));
-             }
-          } finally {
-             input.close();
-          }
-        }
-          catch (IOException ex){
-          ex.printStackTrace();
-        }
+  public static HashMap<String, String> clusterMap = new HashMap<String, String>();
+  private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator;
 
-        return contents.toString();
-    }
-
-    public ClusterConfig() {
-        File cc = new File(path+"jdbc.conf");
-        String buffer = getContents(cc);
-        String[] lines = buffer.split("\n");
-        for(String line: lines) {
-            String[] data = line.split("=",2);
-            clusterMap.put(data[0],data[1]);
+  static public String getContents(File aFile) {
+    // ...checks on aFile are elided
+    StringBuffer contents = new StringBuffer();
+
+    try {
+      // use buffering, reading one line at a time
+      // FileReader always assumes default encoding is OK!
+      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      try {
+        String line = null; // not declared within while loop
+        /*
+         * readLine is a bit quirky : it returns the content of a line MINUS the
+         * newline. it returns null only for the END of the stream. it returns
+         * an empty String if two newlines appear in a row.
+         */
+        while ((line = input.readLine()) != null) {
+          contents.append(line);
+          contents.append(System.getProperty("line.separator"));
         }
+      } finally {
+        input.close();
+      }
+    } catch (IOException ex) {
+      ex.printStackTrace();
     }
 
-    public String getURL(String cluster) {
-        String url = clusterMap.get(cluster);
-        return url; 
+    return contents.toString();
+  }
+
+  public ClusterConfig() {
+    File cc = new File(path + "jdbc.conf");
+    String buffer = getContents(cc);
+    String[] lines = buffer.split("\n");
+    for (String line : lines) {
+      String[] data = line.split("=", 2);
+      clusterMap.put(data[0], data[1]);
     }
+  }
 
-    public Iterator<String> getClusters() {
-        Set<String> keys = clusterMap.keySet();
-        Iterator<String> i = keys.iterator();
-        return i;
-    }    
+  public String getURL(String cluster) {
+    String url = clusterMap.get(cluster);
+    return url;
+  }
+
+  public Iterator<String> getClusters() {
+    Set<String> keys = clusterMap.keySet();
+    Iterator<String> i = keys.iterator();
+    return i;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,70 +18,72 @@
 
 package org.apache.hadoop.chukwa.util;
 
-import java.util.Random;
 
+import java.util.Random;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 
-public class ConstRateAdaptor  extends Thread implements Adaptor {
+public class ConstRateAdaptor extends Thread implements Adaptor {
 
+  private static final int SLEEP_VARIANCE = 200;
+  private static final int MIN_SLEEP = 300;
 
-  private static final int SLEEP_VARIANCE = 200; 
-  private static final int MIN_SLEEP = 300; 
-  
   private String type;
   private long offset;
   private int bytesPerSec;
   private ChunkReceiver dest;
   private long adaptorID;
-  
+
   private volatile boolean stopping = false;
+
   public String getCurrentStatus() throws AdaptorException {
-    return type.trim()  + " " + bytesPerSec + " " + offset;
+    return type.trim() + " " + bytesPerSec + " " + offset;
   }
 
-  public void start(long adaptor, String type, String bytesPerSecParam, long offset, ChunkReceiver dest) throws AdaptorException
-  {
-    try{
+  public void start(long adaptor, String type, String bytesPerSecParam,
+      long offset, ChunkReceiver dest) throws AdaptorException {
+    try {
       bytesPerSec = Integer.parseInt(bytesPerSecParam.trim());
-    } catch(NumberFormatException e) {
-      throw new AdaptorException("bad argument to const rate adaptor: [" + bytesPerSecParam + "]");
+    } catch (NumberFormatException e) {
+      throw new AdaptorException("bad argument to const rate adaptor: ["
+          + bytesPerSecParam + "]");
     }
     this.adaptorID = adaptor;
     this.offset = offset;
     this.type = type;
     this.dest = dest;
     this.setName("ConstRate Adaptor_" + type);
-    super.start();  //this is a Thread.start
+    super.start(); // this is a Thread.start
   }
-  
+
   public String getStreamName() {
     return this.type;
   }
-  
-  public void run()
-  {
+
+  public void run() {
     Random r = new Random();
-    try{
-      while(!stopping) {
-        int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; //between 1 and 3 secs
-          //FIXME: I think there's still a risk of integer overflow here
+    try {
+      while (!stopping) {
+        int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
+                                                               // 3 secs
+        // FIXME: I think there's still a risk of integer overflow here
         int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
-        byte[] data = new byte[ arraySize];
+        byte[] data = new byte[arraySize];
         r.nextBytes(data);
         offset += data.length;
-        ChunkImpl evt = new ChunkImpl(type,"random data source",  offset, data , this);
+        ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
+            this);
 
         dest.add(evt);
-        
+
         Thread.sleep(MSToSleep);
-      } //end while
-    }  catch(InterruptedException ie)
-    {} //abort silently
+      } // end while
+    } catch (InterruptedException ie) {
+    } // abort silently
   }
-  
+
   public String toString() {
     return "const rate " + type;
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java Wed Mar 11 22:39:26 2009
@@ -18,166 +18,172 @@
 
 package org.apache.hadoop.chukwa.util;
 
+
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.sql.ResultSet;
 import java.text.SimpleDateFormat;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 
 public class DatabaseWriter {
-    private static Log log = LogFactory.getLog(DatabaseWriter.class);
-    private Connection conn = null;    
-    private Statement stmt = null; 
-    private ResultSet rs = null;
-
-    public DatabaseWriter(String host, String user, String password) {
-    	DataConfig mdlConfig = new DataConfig();
-    	String jdbc_url = "jdbc:mysql://"+host+"/";
-        if(user!=null) {
-            jdbc_url = jdbc_url + "?user=" + user;
-            if(password!=null) {
-                jdbc_url = jdbc_url + "&password=" + password;
-            }
-        }
-        try {
-            // The newInstance() call is a work around for some
-            // broken Java implementations
-            Class.forName("com.mysql.jdbc.Driver").newInstance();
-        } catch (Exception ex) {
-            // handle the error
-            log.error(ex,ex);
-        }
-        try {
-            conn = DriverManager.getConnection(jdbc_url);
-            log.debug("Initialized JDBC URL: "+jdbc_url);
-        } catch (SQLException ex) {
-            log.error(ex,ex);
-        }
-    }
+  private static Log log = LogFactory.getLog(DatabaseWriter.class);
+  private Connection conn = null;
+  private Statement stmt = null;
+  private ResultSet rs = null;
+
+  public DatabaseWriter(String host, String user, String password) {
+    DataConfig mdlConfig = new DataConfig();
+    String jdbc_url = "jdbc:mysql://" + host + "/";
+    if (user != null) {
+      jdbc_url = jdbc_url + "?user=" + user;
+      if (password != null) {
+        jdbc_url = jdbc_url + "&password=" + password;
+      }
+    }
+    try {
+      // The newInstance() call is a work around for some
+      // broken Java implementations
+      Class.forName("com.mysql.jdbc.Driver").newInstance();
+    } catch (Exception ex) {
+      // handle the error
+      log.error(ex, ex);
+    }
+    try {
+      conn = DriverManager.getConnection(jdbc_url);
+      log.debug("Initialized JDBC URL: " + jdbc_url);
+    } catch (SQLException ex) {
+      log.error(ex, ex);
+    }
+  }
+
+  public DatabaseWriter(String cluster) {
+    ClusterConfig cc = new ClusterConfig();
+    String jdbc_url = cc.getURL(cluster);
+    try {
+      // The newInstance() call is a work around for some
+      // broken Java implementations
+      Class.forName("com.mysql.jdbc.Driver").newInstance();
+    } catch (Exception ex) {
+      // handle the error
+      log.error(ex, ex);
+    }
+    try {
+      conn = DriverManager.getConnection(jdbc_url);
+      log.debug("Initialized JDBC URL: " + jdbc_url);
+    } catch (SQLException ex) {
+      log.error(ex, ex);
+    }
+  }
+
+  public DatabaseWriter() {
+    DataConfig mdlConfig = new DataConfig();
+    String jdbc_url = "jdbc:mysql://" + mdlConfig.get("jdbc.host") + "/"
+        + mdlConfig.get("jdbc.db");
+    if (mdlConfig.get("jdbc.user") != null) {
+      jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+      if (mdlConfig.get("jdbc.password") != null) {
+        jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+      }
+    }
+    try {
+      // The newInstance() call is a work around for some
+      // broken Java implementations
+      Class.forName("com.mysql.jdbc.Driver").newInstance();
+    } catch (Exception ex) {
+      // handle the error
+      log.error(ex, ex);
+    }
+    try {
+      conn = DriverManager.getConnection(jdbc_url);
+      log.debug("Initialized JDBC URL: " + jdbc_url);
+    } catch (SQLException ex) {
+      log.error(ex, ex);
+    }
+  }
+
+  public void execute(String query) {
+    try {
+      stmt = conn.createStatement();
+      stmt.execute(query);
+    } catch (SQLException ex) {
+      // handle any errors
+      log.error(ex, ex);
+      log.error("SQL Statement:" + query);
+      log.error("SQLException: " + ex.getMessage());
+      log.error("SQLState: " + ex.getSQLState());
+      log.error("VendorError: " + ex.getErrorCode());
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException sqlEx) {
+          // ignore
+        }
+        stmt = null;
+      }
+    }
+  }
+
+  public Connection getConnection() {
+    return conn;
+  }
+
+  public ResultSet query(String query) throws SQLException {
+    try {
+      stmt = conn.createStatement();
+      rs = stmt.executeQuery(query);
+    } catch (SQLException ex) {
+      // handle any errors
+      log.debug(ex, ex);
+      log.debug("SQL Statement:" + query);
+      log.debug("SQLException: " + ex.getMessage());
+      log.debug("SQLState: " + ex.getSQLState());
+      log.debug("VendorError: " + ex.getErrorCode());
+      throw ex;
+    } finally {
+    }
+    return rs;
+  }
+
+  public void close() {
+    // it is a good idea to release
+    // resources in a finally{} block
+    // in reverse-order of their creation
+    // if they are no-longer needed
+    if (rs != null) {
+      try {
+        rs.close();
+      } catch (SQLException sqlEx) {
+        // ignore
+      }
+      rs = null;
+    }
+    if (stmt != null) {
+      try {
+        stmt.close();
+      } catch (SQLException sqlEx) {
+        // ignore
+      }
+      stmt = null;
+    }
+    if (conn != null) {
+      try {
+        conn.close();
+      } catch (SQLException sqlEx) {
+        // ignore
+      }
+      conn = null;
+    }
+  }
+
+  public static String formatTimeStamp(long timestamp) {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    String format = formatter.format(timestamp);
 
-    public DatabaseWriter(String cluster) {
-    	ClusterConfig cc = new ClusterConfig();
-    	String jdbc_url = cc.getURL(cluster);
-        try {
-            // The newInstance() call is a work around for some
-            // broken Java implementations
-            Class.forName("com.mysql.jdbc.Driver").newInstance();
-        } catch (Exception ex) {
-            // handle the error
-            log.error(ex,ex);
-        }
-        try {
-            conn = DriverManager.getConnection(jdbc_url);
-            log.debug("Initialized JDBC URL: "+jdbc_url);
-        } catch (SQLException ex) {
-            log.error(ex,ex);
-        }
-    }
-    
-    public DatabaseWriter() {
-    	DataConfig mdlConfig = new DataConfig();
-    	String jdbc_url = "jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
-        if(mdlConfig.get("jdbc.user")!=null) {
-            jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
-            if(mdlConfig.get("jdbc.password")!=null) {
-                jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
-            }
-        }
-        try {
-            // The newInstance() call is a work around for some
-            // broken Java implementations
-            Class.forName("com.mysql.jdbc.Driver").newInstance();
-        } catch (Exception ex) {
-            // handle the error
-            log.error(ex,ex);
-        }
-        try {
-            conn = DriverManager.getConnection(jdbc_url);
-            log.debug("Initialized JDBC URL: "+jdbc_url);
-        } catch (SQLException ex) {
-            log.error(ex,ex);
-        }
-    }
-    public void execute(String query) {
-        try {
-            stmt = conn.createStatement(); 
-            stmt.execute(query);
-        } catch (SQLException ex) {
-            // handle any errors
-            log.error(ex, ex);
-            log.error("SQL Statement:" + query);
-            log.error("SQLException: " + ex.getMessage());
-            log.error("SQLState: " + ex.getSQLState());
-            log.error("VendorError: " + ex.getErrorCode());
-        } finally {
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException sqlEx) {
-                    // ignore
-                }
-                stmt = null;
-            }
-        }
-    }
-    public Connection getConnection() {
-    	return conn;
-    }
-    public ResultSet query(String query) throws SQLException {
-        try {
-            stmt = conn.createStatement(); 
-            rs = stmt.executeQuery(query);
-        } catch (SQLException ex) {
-            // handle any errors
-            log.debug(ex, ex);
-            log.debug("SQL Statement:" + query);
-            log.debug("SQLException: " + ex.getMessage());
-            log.debug("SQLState: " + ex.getSQLState());
-            log.debug("VendorError: " + ex.getErrorCode());
-            throw ex;
-        } finally {
-        }
-        return rs;
-    }
-    public void close() {
-    	// it is a good idea to release
-        // resources in a finally{} block
-        // in reverse-order of their creation
-        // if they are no-longer needed
-        if (rs != null) {
-            try {
-                rs.close();
-            } catch (SQLException sqlEx) {
-                // ignore
-            }
-            rs = null;
-        }
-        if (stmt != null) {
-            try {
-                stmt.close();
-            } catch (SQLException sqlEx) {
-                // ignore
-            }
-            stmt = null;
-        }    
-        if (conn != null) {
-            try {
-                conn.close();
-            } catch (SQLException sqlEx) {
-                // ignore
-            }
-            conn = null;
-        }
-    }
-    public static String formatTimeStamp(long timestamp) {
-        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        String format = formatter.format(timestamp);
-
-    	return format;
-    }
+    return format;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
 package org.apache.hadoop.chukwa.util;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -11,51 +11,43 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 
-public class DumpArchive
-{
+public class DumpArchive {
 
-	/**
-	 * @param args
-	 * @throws URISyntaxException 
-	 * @throws IOException 
-	 */
-	public static void main(String[] args) throws IOException, URISyntaxException
-	{
-		System.out.println("Input file:" + args[0]);
-
-		ChukwaConfiguration conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
-		SequenceFile.Reader r = 
-			new SequenceFile.Reader(fs,new Path(args[0]), conf);
-		
-		ChukwaArchiveKey key = new ChukwaArchiveKey();
-		ChunkImpl chunk = ChunkImpl.getBlankChunk();
-		try
-		{
-			while (r.next(key, chunk))
-			{
-				System.out.println("\nTimePartition: " + key.getTimePartition());
-				System.out.println("DataType: " + key.getDataType());
-				System.out.println("StreamName: " + key.getStreamName());
-				System.out.println("SeqId: " + key.getSeqId());
-				System.out.println("\t\t =============== ");
-				
-				System.out.println("Cluster : " + chunk.getTags());
-				System.out.println("DataType : " + chunk.getDataType());
-				System.out.println("Source : " + chunk.getSource());
-				System.out.println("Application : " + chunk.getApplication());
-				System.out.println("SeqID : " + chunk.getSeqID());
-				System.out.println("Data : " + new String(chunk.getData()));
-			}
-		} 
-		catch (Exception e)
-		{
-			e.printStackTrace();
-		} 
-	
+  /**
+   * @param args
+   * @throws URISyntaxException
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException, URISyntaxException {
+    System.out.println("Input file:" + args[0]);
+
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+    SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(args[0]), conf);
+
+    ChukwaArchiveKey key = new ChukwaArchiveKey();
+    ChunkImpl chunk = ChunkImpl.getBlankChunk();
+    try {
+      while (r.next(key, chunk)) {
+        System.out.println("\nTimePartition: " + key.getTimePartition());
+        System.out.println("DataType: " + key.getDataType());
+        System.out.println("StreamName: " + key.getStreamName());
+        System.out.println("SeqId: " + key.getSeqId());
+        System.out.println("\t\t =============== ");
+
+        System.out.println("Cluster : " + chunk.getTags());
+        System.out.println("DataType : " + chunk.getDataType());
+        System.out.println("Source : " + chunk.getSource());
+        System.out.println("Application : " + chunk.getApplication());
+        System.out.println("SeqID : " + chunk.getSeqID());
+        System.out.println("Data : " + new String(chunk.getData()));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
 
-	}
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
 package org.apache.hadoop.chukwa.util;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -11,49 +11,40 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 
-public class DumpDataType
-{
+public class DumpDataType {
 
-	/**
-	 * @param args
-	 * @throws URISyntaxException 
-	 * @throws IOException 
-	 */
-	public static void main(String[] args) throws IOException, URISyntaxException
-	{
-		System.err.println("Input file:" + args[0]);
-		System.err.println("DataType:" + args[1]);
-		System.err.println("Source:" + args[2]);
-		
-		ChukwaConfiguration conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
-		SequenceFile.Reader r = 
-			new SequenceFile.Reader(fs,new Path(args[0]), conf);
-		
-		ChukwaArchiveKey key = new ChukwaArchiveKey();
-		ChunkImpl chunk = ChunkImpl.getBlankChunk();
-		try
-		{
-			while (r.next(key, chunk))
-			{
-				if (args[1].equalsIgnoreCase(chunk.getDataType()))
-				{
-					if (args[2].equalsIgnoreCase("ALL") || args[2].equalsIgnoreCase(chunk.getSource()))
-					{
-						System.out.print(new String(chunk.getData()));
-					}	
-				}
-				
-			}
-		} 
-		catch (Exception e)
-		{
-			e.printStackTrace();
-		} 
-	
+  /**
+   * @param args
+   * @throws URISyntaxException
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException, URISyntaxException {
+    System.err.println("Input file:" + args[0]);
+    System.err.println("DataType:" + args[1]);
+    System.err.println("Source:" + args[2]);
+
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+    SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(args[0]), conf);
+
+    ChukwaArchiveKey key = new ChukwaArchiveKey();
+    ChunkImpl chunk = ChunkImpl.getBlankChunk();
+    try {
+      while (r.next(key, chunk)) {
+        if (args[1].equalsIgnoreCase(chunk.getDataType())) {
+          if (args[2].equalsIgnoreCase("ALL")
+              || args[2].equalsIgnoreCase(chunk.getSource())) {
+            System.out.print(new String(chunk.getData()));
+          }
+        }
+
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
 
-	}
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
 package org.apache.hadoop.chukwa.util;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -11,52 +11,42 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 
-public class DumpRecord
-{
-
-	/**
-	 * @param args
-	 * @throws URISyntaxException 
-	 * @throws IOException 
-	 */
-	public static void main(String[] args) throws IOException, URISyntaxException
-	{
-		System.out.println("Input file:" + args[0]);
-
-		ChukwaConfiguration conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
-		SequenceFile.Reader r = 
-			new SequenceFile.Reader(fs,new Path(args[0]), conf);
-		
-		ChukwaRecordKey key = new ChukwaRecordKey();
-		ChukwaRecord record = new ChukwaRecord();
-		try
-		{
-			while (r.next(key, record))
-			{
-				System.out.println("\t ===== KEY   ===== ");
-				
-				System.out.println("DataType: " + key.getReduceType());
-				System.out.println("\nKey: " + key.getKey());
-				System.out.println("\t ===== Value =====");
-				
-				String[] fields = record.getFields();
-				System.out.println("Timestamp : " + record.getTime());
-				for (String field: fields)
-				{
-					System.out.println("[" +field +"] :" + record.getValue(field));
-				}
-			}
-		} 
-		catch (Exception e)
-		{
-			e.printStackTrace();
-		} 
-	
+public class DumpRecord {
 
+  /**
+   * @param args
+   * @throws URISyntaxException
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException, URISyntaxException {
+    System.out.println("Input file:" + args[0]);
+
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+    SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(args[0]), conf);
+
+    ChukwaRecordKey key = new ChukwaRecordKey();
+    ChukwaRecord record = new ChukwaRecord();
+    try {
+      while (r.next(key, record)) {
+        System.out.println("\t ===== KEY   ===== ");
+
+        System.out.println("DataType: " + key.getReduceType());
+        System.out.println("\nKey: " + key.getKey());
+        System.out.println("\t ===== Value =====");
+
+        String[] fields = record.getFields();
+        System.out.println("Timestamp : " + record.getTime());
+        for (String field : fields) {
+          System.out.println("[" + field + "] :" + record.getValue(field));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
 
-	}
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java Wed Mar 11 22:39:26 2009
@@ -18,15 +18,16 @@
 
 package org.apache.hadoop.chukwa.util;
 
+
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
 public class ExceptionUtil {
-    public static String getStackTrace(Throwable t) {
-        StringWriter sw = new StringWriter();
-        PrintWriter pw = new PrintWriter(sw);
-        t.printStackTrace(pw);
-        pw.flush();
-        return sw.toString();
-    }
+  public static String getStackTrace(Throwable t) {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    t.printStackTrace(pw);
+    pw.flush();
+    return sw.toString();
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Wed Mar 11 22:39:26 2009
@@ -18,60 +18,59 @@
 
 package org.apache.hadoop.chukwa.util;
 
-import java.util.Random;
 
+import java.util.Random;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 
-public class MaxRateSender  extends Thread implements Adaptor {
-
+public class MaxRateSender extends Thread implements Adaptor {
 
   public static final int BUFFER_SIZE = 60 * 1024;
   public static final String ADAPTOR_NAME = "MaxRateSender";
-  
+
   private volatile boolean stopping = false;
   private long offset;
   private String type;
   ChunkReceiver dest;
   private long adaptorID;
-  
+
   public String getCurrentStatus() throws AdaptorException {
     return "";
   }
 
-  public void start(long adaptor, String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
-  {
+  public void start(long adaptor, String type, String status, long offset,
+      ChunkReceiver dest) throws AdaptorException {
     this.setName("MaxRateSender adaptor");
     this.adaptorID = adaptor;
     this.offset = offset;
     this.type = type;
     this.dest = dest;
-    super.start();  //this is a Thread.start
+    super.start(); // this is a Thread.start
   }
-  
+
   public String getStreamName() {
-	  return type;
+    return type;
   }
-  
-  public void run()
-  {
+
+  public void run() {
     Random r = new Random();
-    
-    try{
-      while(!stopping) {
-        byte[] data = new byte[ BUFFER_SIZE];
+
+    try {
+      while (!stopping) {
+        byte[] data = new byte[BUFFER_SIZE];
         r.nextBytes(data);
         offset += data.length;
-        ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data, this);
+        ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
+            this);
         dest.add(evt);
-        
+
       }
-    }  catch(InterruptedException ie)
-    {}
+    } catch (InterruptedException ie) {
+    }
   }
-  
+
   public String toString() {
     return ADAPTOR_NAME;
   }
@@ -80,11 +79,10 @@
     stopping = true;
     return offset;
   }
-  
+
   public void hardStop() throws AdaptorException {
     stopping = true;
   }
-  
 
   @Override
   public String getType() {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java Wed Mar 11 22:39:26 2009
@@ -18,102 +18,101 @@
 
 package org.apache.hadoop.chukwa.util;
 
+
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.channels.*;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 public class PidFile extends Thread {
-	
-	String name;
-	private static Log log = LogFactory.getLog(PidFile.class);
-	private static FileLock lock = null;
-        private static FileOutputStream pidFileOutput = null;
-	
-	public PidFile(String name){
-		this.name=name;
-		try {
-		    init();
-		} catch(IOException ex) {
-			clean();
-			System.exit(-1);
-		}
-	}
-	
-	public void init() throws IOException{
-  	     String pidLong=ManagementFactory.getRuntimeMXBean().getName();
-  	     String[] items=pidLong.split("@");
-  	     String pid=items[0];
-	     String chukwaPath=System.getProperty("CHUKWA_HOME");
-	     StringBuffer pidFilesb=new StringBuffer();
-	     String pidDir = System.getenv("CHUKWA_PID_DIR");
-	     if (pidDir == null) {
-	       pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
-	     }
-	     pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
-	     try{
-	    	 File existsFile = new File(pidDir);
-	    	 if(!existsFile.exists()) {
-		    	 boolean success = (new File(pidDir)).mkdirs();
-		    	 if(!success) {
-		    		 throw(new IOException());
-		    	 }
-	    	 }
-	         File pidFile= new File(pidFilesb.toString());
-
-	         pidFileOutput= new FileOutputStream(pidFile);
-             pidFileOutput.write(pid.getBytes());
-	         pidFileOutput.flush();
-	         FileChannel channel = pidFileOutput.getChannel();
-	         PidFile.lock = channel.tryLock();
-             if(PidFile.lock!=null) {
-	             log.debug("Initlization succeeded...");
-             } else {
-                 throw(new IOException());
-             }
-	     }catch (IOException ex){
-	    	 System.out.println("Initializaiton failed: can not write pid file.");
-	    	 log.error("Initialization failed...");
-	    	 log.error(ex.getMessage());
-	    	 System.exit(-1);
-	    	 throw ex;
-	    	 
-	     }
-	   
-	}	
-	
-	public void clean(){
-        String chukwaPath=System.getenv("CHUKWA_HOME");
-        StringBuffer pidFilesb=new StringBuffer();
-        String pidDir = System.getenv("CHUKWA_PID_DIR");
-        if (pidDir == null) {
-          pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
-        }
-        pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid"); 
-        String pidFileName=pidFilesb.toString();
-
-        File pidFile=new File(pidFileName);
-        if (!pidFile.exists()) {
-    	   log.error("Delete pid file, No such file or directory: "+pidFileName);
-        } else {
-           try {
-               lock.release();
-	       pidFileOutput.close();
-           } catch(IOException e) {
-               log.error("Unable to release file lock: "+pidFileName);
-           }
-        }
 
-        boolean result=pidFile.delete();
-        if (!result){
-    	   log.error("Delete pid file failed, "+pidFileName);
+  String name;
+  private static Log log = LogFactory.getLog(PidFile.class);
+  private static FileLock lock = null;
+  private static FileOutputStream pidFileOutput = null;
+
+  public PidFile(String name) {
+    this.name = name;
+    try {
+      init();
+    } catch (IOException ex) {
+      clean();
+      System.exit(-1);
+    }
+  }
+
+  public void init() throws IOException {
+    String pidLong = ManagementFactory.getRuntimeMXBean().getName();
+    String[] items = pidLong.split("@");
+    String pid = items[0];
+    String chukwaPath = System.getProperty("CHUKWA_HOME");
+    StringBuffer pidFilesb = new StringBuffer();
+    String pidDir = System.getenv("CHUKWA_PID_DIR");
+    if (pidDir == null) {
+      pidDir = chukwaPath + File.separator + "var" + File.separator + "run";
+    }
+    pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+    try {
+      File existsFile = new File(pidDir);
+      if (!existsFile.exists()) {
+        boolean success = (new File(pidDir)).mkdirs();
+        if (!success) {
+          throw (new IOException());
         }
-	}
+      }
+      File pidFile = new File(pidFilesb.toString());
 
-	public void run() {
-		clean();
-	}
+      pidFileOutput = new FileOutputStream(pidFile);
+      pidFileOutput.write(pid.getBytes());
+      pidFileOutput.flush();
+      FileChannel channel = pidFileOutput.getChannel();
+      PidFile.lock = channel.tryLock();
+      if (PidFile.lock != null) {
+        log.debug("Initlization succeeded...");
+      } else {
+        throw (new IOException());
+      }
+    } catch (IOException ex) {
+      System.out.println("Initializaiton failed: can not write pid file.");
+      log.error("Initialization failed...");
+      log.error(ex.getMessage());
+      System.exit(-1);
+      throw ex;
+
+    }
+
+  }
+
+  public void clean() {
+    String chukwaPath = System.getenv("CHUKWA_HOME");
+    StringBuffer pidFilesb = new StringBuffer();
+    String pidDir = System.getenv("CHUKWA_PID_DIR");
+    if (pidDir == null) {
+      pidDir = chukwaPath + File.separator + "var" + File.separator + "run";
+    }
+    pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+    String pidFileName = pidFilesb.toString();
+
+    File pidFile = new File(pidFileName);
+    if (!pidFile.exists()) {
+      log.error("Delete pid file, No such file or directory: " + pidFileName);
+    } else {
+      try {
+        lock.release();
+        pidFileOutput.close();
+      } catch (IOException e) {
+        log.error("Unable to release file lock: " + pidFileName);
+      }
+    }
+
+    boolean result = pidFile.delete();
+    if (!result) {
+      log.error("Delete pid file failed, " + pidFileName);
+    }
+  }
+
+  public void run() {
+    clean();
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java Wed Mar 11 22:39:26 2009
@@ -18,47 +18,90 @@
 
 package org.apache.hadoop.chukwa.util;
 
-public class RecordConstants
-{
-  static final char[] CTRL_A =  {'\u0001'};
-  static final char[] CTRL_B =  {'\u0002'};
-  static final char[] CTRL_C =  {'\u0003'};
-  static final char[] CTRL_D =  {'\u0004'};
-	//public static final String FIELD_SEPARATOR = new String(CTRL_A);
-	public static final String DEFAULT_FIELD_SEPARATOR = "-#-";
-	public static final String DEFAULT_RECORD_SEPARATOR = "\n";
-	public static final String RECORD_SEPARATOR_ESCAPE_SEQ = new String (CTRL_D);// may want this to be very obscure, e.g. new String(CTRL_B) + new String (CTRL_C) + new String (CTRL_D)
-	
-	/**
-	 * Insert the default chukwa escape sequence in <code>record</code> before all occurances of 
-	 * <code>recordSeparator</code> <i>except</i> the final one if the final record separator occurs
-	 * at the end of the <code>record</code> 
-	 * @param recordSeparator The record separator that we are escaping. This is chunk source application specific 
-	 * @param record The string representing the entire record, including the final record delimiter
-	 * @return The string with appropriate <code>recordSeparator</code>s escaped
-	 */
-	public static String escapeAllButLastRecordSeparator(String recordSeparator,String record){
-	  String escapedRecord = "";
-	  if (record.endsWith(recordSeparator)){
-	    escapedRecord = record.substring(0,record.length()-recordSeparator.length()).replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator) + recordSeparator;
-	  }
-	  return escapedRecord;
-	}
-	
+
+public class RecordConstants {
+  static final char[] CTRL_A = { '\u0001' };
+  static final char[] CTRL_B = { '\u0002' };
+  static final char[] CTRL_C = { '\u0003' };
+  static final char[] CTRL_D = { '\u0004' };
+  // public static final String FIELD_SEPARATOR = new String(CTRL_A);
+  public static final String DEFAULT_FIELD_SEPARATOR = "-#-";
+  public static final String DEFAULT_RECORD_SEPARATOR = "\n";
+  public static final String RECORD_SEPARATOR_ESCAPE_SEQ = new String(CTRL_D);// may
+                                                                              // want
+                                                                              // this
+                                                                              // to
+                                                                              // be
+                                                                              // very
+                                                                              // obscure
+                                                                              // ,
+                                                                              // e
+                                                                              // .
+                                                                              // g
+                                                                              // .
+                                                                              // new
+                                                                              // String
+                                                                              // (
+                                                                              // CTRL_B
+                                                                              // )
+                                                                              // +
+                                                                              // new
+                                                                              // String
+                                                                              // (
+                                                                              // CTRL_C
+                                                                              // )
+                                                                              // +
+                                                                              // new
+                                                                              // String
+                                                                              // (
+                                                                              // CTRL_D
+                                                                              // )
+
   /**
-   * Insert the default chukwa escape sequence in <code>record</code> before all occurances of 
-   * <code>recordSeparator</code>. This is assuming that you are not passing the final record
-   * separator in with the <code>record</code>, because it would be escaped too. 
-   * @param recordSeparator The record separator that we are escaping. This is chunk source application specific 
-   * @param record The string representing the entire record, including the final record delimiter
+   * Insert the default chukwa escape sequence in <code>record</code> before all
+   * occurances of <code>recordSeparator</code> <i>except</i> the final one if
+   * the final record separator occurs at the end of the <code>record</code>
+   * 
+   * @param recordSeparator The record separator that we are escaping. This is
+   *        chunk source application specific
+   * @param record The string representing the entire record, including the
+   *        final record delimiter
+   * @return The string with appropriate <code>recordSeparator</code>s escaped
+   */
+  public static String escapeAllButLastRecordSeparator(String recordSeparator,
+      String record) {
+    String escapedRecord = "";
+    if (record.endsWith(recordSeparator)) {
+      escapedRecord = record.substring(0,
+          record.length() - recordSeparator.length()).replaceAll(
+          recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator)
+          + recordSeparator;
+    }
+    return escapedRecord;
+  }
+
+  /**
+   * Insert the default chukwa escape sequence in <code>record</code> before all
+   * occurances of <code>recordSeparator</code>. This is assuming that you are
+   * not passing the final record separator in with the <code>record</code>,
+   * because it would be escaped too.
+   * 
+   * @param recordSeparator The record separator that we are escaping. This is
+   *        chunk source application specific
+   * @param record The string representing the entire record, including the
+   *        final record delimiter
    * @return The string with all <code>recordSeparator</code>s escaped
    */
-	 public static String escapeAllRecordSeparators(String recordSeparator,String record){
-	      return record.replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator);
-	  }
-	
-	public static String recoverRecordSeparators(String recordSeparator, String record){
-    return record.replaceAll(RECORD_SEPARATOR_ESCAPE_SEQ+recordSeparator, recordSeparator);
+  public static String escapeAllRecordSeparators(String recordSeparator,
+      String record) {
+    return record.replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ
+        + recordSeparator);
+  }
+
+  public static String recoverRecordSeparators(String recordSeparator,
+      String record) {
+    return record.replaceAll(RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator,
+        recordSeparator);
   }
-	
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
-import java.util.HashMap;
 
+import java.util.HashMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
@@ -30,75 +30,85 @@
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.fs.Path;
 
-public class ChukwaJobTrackerInstrumentation extends org.apache.hadoop.mapred.JobTrackerInstrumentation {
+public class ChukwaJobTrackerInstrumentation extends
+    org.apache.hadoop.mapred.JobTrackerInstrumentation {
 
-	  protected final JobTracker tracker;
-	  private static ChukwaAgentController chukwaClient = null;
-	  private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
-	  private static HashMap<JobID, Long> jobConfs = null;
-	  private static HashMap<JobID, Long> jobHistories = null;
-
-	  public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
-          super(jt,conf);
-	      tracker = jt;
-	      if(chukwaClient==null) {
-		      chukwaClient = new ChukwaAgentController();
-	      }
-	      if(jobConfs==null) {
-	    	  jobConfs = new HashMap<JobID, Long>();
-	      }
-	      if(jobHistories==null) {
-	    	  jobHistories = new HashMap<JobID, Long>();
-	      }
-	  }
-
-	  public void launchMap(TaskAttemptID taskAttemptID) {
-		  
-	  }
-
-	  public void completeMap(TaskAttemptID taskAttemptID) {
-		  
-	  }
-
-	  public void launchReduce(TaskAttemptID taskAttemptID) {
-		  
-	  }
-
-	  public void completeReduce(TaskAttemptID taskAttemptID) {
-		  
-	  }
-
-	  public void submitJob(JobConf conf, JobID id) {
-          String chukwaJobConf = tracker.getLocalJobFilePath(id);
-          try {
-              String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-              Path jobHistoryPath = JobHistory.JobInfo.getJobHistoryLogLocation(jobFileName);
-              String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
-              long adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", "JobConf", "0 "+jobConfPath, 0);
-              jobConfs.put(id, adaptorID);
-              if(jobHistoryPath.toString().matches("^hdfs://")) {
-                  adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor", "JobHistory", "0 "+jobHistoryPath.toString(), 0);
-              } else {
-                  adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", "JobHistory", "0 "+jobHistoryPath.toString().substring(5), 0);            	  
-              }
-              jobHistories.put(id, adaptorID);
-          } catch(Exception ex) {
-        	  
-          }
+  protected final JobTracker tracker;
+  private static ChukwaAgentController chukwaClient = null;
+  private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
+  private static HashMap<JobID, Long> jobConfs = null;
+  private static HashMap<JobID, Long> jobHistories = null;
+
+  public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+    super(jt, conf);
+    tracker = jt;
+    if (chukwaClient == null) {
+      chukwaClient = new ChukwaAgentController();
+    }
+    if (jobConfs == null) {
+      jobConfs = new HashMap<JobID, Long>();
+    }
+    if (jobHistories == null) {
+      jobHistories = new HashMap<JobID, Long>();
+    }
+  }
+
+  public void launchMap(TaskAttemptID taskAttemptID) {
+
+  }
+
+  public void completeMap(TaskAttemptID taskAttemptID) {
+
+  }
+
+  public void launchReduce(TaskAttemptID taskAttemptID) {
+
+  }
+
+  public void completeReduce(TaskAttemptID taskAttemptID) {
+
+  }
+
+  public void submitJob(JobConf conf, JobID id) {
+    String chukwaJobConf = tracker.getLocalJobFilePath(id);
+    try {
+      String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+      Path jobHistoryPath = JobHistory.JobInfo
+          .getJobHistoryLogLocation(jobFileName);
+      String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
+      long adaptorID = chukwaClient
+          .add(
+              "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
+              "JobConf", "0 " + jobConfPath, 0);
+      jobConfs.put(id, adaptorID);
+      if (jobHistoryPath.toString().matches("^hdfs://")) {
+        adaptorID = chukwaClient.add(
+            "org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor",
+            "JobHistory", "0 " + jobHistoryPath.toString(), 0);
+      } else {
+        adaptorID = chukwaClient
+            .add(
+                "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
+                "JobHistory", "0 " + jobHistoryPath.toString().substring(5), 0);
       }
+      jobHistories.put(id, adaptorID);
+    } catch (Exception ex) {
+
+    }
+  }
 
-	  public void completeJob(JobConf conf, JobID id) {
-          try {
-             if (jobHistories.containsKey(id)) {
-                 chukwaClient.remove(jobHistories.get(id));
-             }
-             if (jobConfs.containsKey(id)) {
-                 chukwaClient.remove(jobConfs.get(id));
-             }
-          } catch(Throwable e) {
-            log.warn("could not remove adaptor for this job: " + id.toString(),e);
-            e.printStackTrace();
-          }
-	  }
+  public void completeJob(JobConf conf, JobID id) {
+    try {
+      if (jobHistories.containsKey(id)) {
+        chukwaClient.remove(jobHistories.get(id));
+      }
+      if (jobConfs.containsKey(id)) {
+        chukwaClient.remove(jobConfs.get(id));
+      }
+    } catch (Throwable e) {
+      log.warn("could not remove adaptor for this job: " + id.toString(), e);
+      e.printStackTrace();
+    }
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java Wed Mar 11 22:39:26 2009
@@ -1,58 +1,48 @@
 package org.apache.hadoop.chukwa;
 
-import java.io.IOException;
 
+import java.io.IOException;
 import junit.framework.TestCase;
-
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 
-public class ChunkImplTest extends TestCase
-{
-	public void testVersion()
-	{
-		ChunkBuilder cb = new ChunkBuilder();
-		cb.addRecord("foo".getBytes());
-		cb.addRecord("bar".getBytes());
-		cb.addRecord("baz".getBytes());
-		Chunk c = cb.getChunk();
-		DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
-		try
-		{
-			c.write(ob);
-			DataInputBuffer ib = new DataInputBuffer();
-			ib.reset(ob.getData(), c.getSerializedSizeEstimate());
-			int version = ib.readInt();
-			assertEquals(version,ChunkImpl.PROTOCOL_VERSION);
-		} 
-		catch (IOException e)
-		{
-			e.printStackTrace();
-			fail("Should nor raise any exception"); 
-		}
-	}
+public class ChunkImplTest extends TestCase {
+  public void testVersion() {
+    ChunkBuilder cb = new ChunkBuilder();
+    cb.addRecord("foo".getBytes());
+    cb.addRecord("bar".getBytes());
+    cb.addRecord("baz".getBytes());
+    Chunk c = cb.getChunk();
+    DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
+    try {
+      c.write(ob);
+      DataInputBuffer ib = new DataInputBuffer();
+      ib.reset(ob.getData(), c.getSerializedSizeEstimate());
+      int version = ib.readInt();
+      assertEquals(version, ChunkImpl.PROTOCOL_VERSION);
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Should nor raise any exception");
+    }
+  }
 
-	public void testWrongVersion()
-	{
-		ChunkBuilder cb = new ChunkBuilder();
-		cb.addRecord("foo".getBytes());
-		cb.addRecord("bar".getBytes());
-		cb.addRecord("baz".getBytes());
-		Chunk c = cb.getChunk();
-		DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
-		try
-		{
-			c.write(ob);
-			DataInputBuffer ib = new DataInputBuffer();
-			ib.reset(ob.getData(), c.getSerializedSizeEstimate());
-			//change current chunkImpl version
-			ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION+1;
-			ChunkImpl.read(ib);
-			fail("Should have raised an IOexception"); 
-		} 
-		catch (IOException e)
-		{
-			// right behavior, do nothing
-		}
-	}
+  public void testWrongVersion() {
+    ChunkBuilder cb = new ChunkBuilder();
+    cb.addRecord("foo".getBytes());
+    cb.addRecord("bar".getBytes());
+    cb.addRecord("baz".getBytes());
+    Chunk c = cb.getChunk();
+    DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
+    try {
+      c.write(ob);
+      DataInputBuffer ib = new DataInputBuffer();
+      ib.reset(ob.getData(), c.getSerializedSizeEstimate());
+      // change current chunkImpl version
+      ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1;
+      ChunkImpl.read(ib);
+      fail("Should have raised an IOexception");
+    } catch (IOException e) {
+      // right behavior, do nothing
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java Wed Mar 11 22:39:26 2009
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.chukwa;
 
+
 import junit.framework.TestCase;
 
 public class TestChunkBuilder extends TestCase {
 
-
-  public void testChunkBuilder()
-  {
+  public void testChunkBuilder() {
     ChunkBuilder cb = new ChunkBuilder();
     cb.addRecord("foo".getBytes());
     cb.addRecord("bar".getBytes());

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java Wed Mar 11 22:39:26 2009
@@ -18,17 +18,20 @@
 
 package org.apache.hadoop.chukwa.datacollection;
 
+
 import java.io.*;
 import java.util.Random;
 
 public class TempFileUtil {
   public static File makeBinary(int length) throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),"chukwaTest");
+    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
+        "chukwaTest");
     FileOutputStream fos = new FileOutputStream(tmpOutput);
     Random r = new Random();
-    byte[] randomData = new byte[ length];
+    byte[] randomData = new byte[length];
     r.nextBytes(randomData);
-    randomData[ length-1] = '\n';//need data to end with \n since default tailer uses that
+    randomData[length - 1] = '\n';// need data to end with \n since default
+                                  // tailer uses that
     fos.write(randomData);
     fos.flush();
     fos.close();

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
+
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 
 public class ChukwaTestAdaptor implements Adaptor {
@@ -27,11 +28,11 @@
   private String params = null;
   private long startOffset = 0l;
   private ChunkReceiver dest = null;
-  
+
   @Override
   public String getCurrentStatus() throws AdaptorException {
     // TODO Auto-generated method stub
-    return type+ " "+ params + " "+ startOffset;
+    return type + " " + params + " " + startOffset;
   }
 
   @Override
@@ -43,7 +44,7 @@
   @Override
   public void hardStop() throws AdaptorException {
     // TODO Auto-generated method stub
-    
+
   }
 
   @Override
@@ -60,14 +61,12 @@
     this.params = params;
     this.startOffset = offset;
     this.dest = dest;
-    System.out.println("adaptorId [" +adaptorId + "]");
-    System.out.println("type [" +type+ "]");
-    System.out.println("params [" +params+ "]");
-    System.out.println("startOffset [" +startOffset+ "]");
-    
-    
+    System.out.println("adaptorId [" + adaptorId + "]");
+    System.out.println("type [" + type + "]");
+    System.out.println("params [" + params + "]");
+    System.out.println("startOffset [" + startOffset + "]");
+
   }
-  
 
   public String getType() {
     return type;
@@ -108,5 +107,5 @@
   public void setDest(ChunkReceiver dest) {
     this.dest = dest;
   }
-  
+
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java Wed Mar 11 22:39:26 2009
@@ -17,29 +17,31 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
-import junit.framework.TestCase;
 
+import junit.framework.TestCase;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 
 public class TestExecAdaptor extends TestCase {
-  
+
   ChunkCatcherConnector chunks;
+
   public TestExecAdaptor() {
     chunks = new ChunkCatcherConnector();
     chunks.start();
   }
-  
+
   public void testWithPs() throws ChukwaAgent.AlreadyRunningException {
     try {
-      ChukwaAgent  agent = new ChukwaAgent();
-      agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor ps ps aux 0");
-  
+      ChukwaAgent agent = new ChukwaAgent();
+      agent
+          .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor ps ps aux 0");
+
       Chunk c = chunks.waitForAChunk();
       System.out.println(new String(c.getData()));
-    } catch(InterruptedException e) {
-      
+    } catch (InterruptedException e) {
+
     }
   }
 

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java Wed Mar 11 22:39:26 2009
@@ -1,12 +1,11 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
-
 import junit.framework.TestCase;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
@@ -47,7 +46,8 @@
   }
 
   private File makeTestFile(String name, int size) throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"), name);
+    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
+        name);
     FileOutputStream fos = new FileOutputStream(tmpOutput);
 
     PrintWriter pw = new PrintWriter(fos);

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java Wed Mar 11 22:39:26 2009
@@ -1,13 +1,12 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
-
 import junit.framework.Assert;
 import junit.framework.TestCase;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
@@ -77,8 +76,8 @@
 
       FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
       long adaptorId = agent
-          .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 " 
-              + logFile +" 0");
+          .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 "
+              + logFile + " 0");
 
       assertTrue(adaptorId != -1);
 

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java Wed Mar 11 22:39:26 2009
@@ -1,16 +1,15 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-
 import junit.framework.Assert;
 import junit.framework.TestCase;