You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [14/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoo...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,133 +18,116 @@
package org.apache.hadoop.chukwa.inputtools.plugin;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-
import org.json.JSONObject;
-public abstract class ExecPlugin implements IPlugin
-{
- public final int statusOK = 100;
- public final int statusKO = -100;
-
- Process process;
-
- public ExecPlugin()
- {
-
- }
-
- public void stop() {
- process.destroy();
- }
-
- public int waitFor() throws InterruptedException {
- return process.waitFor();
- }
-
- public abstract String getCmde();
-
- public JSONObject postProcess(JSONObject execResult)
- {
- return execResult;
- }
-
- public JSONObject execute()
- {
- JSONObject result = new JSONObject();
- try
- {
- result.put("timestamp", System.currentTimeMillis());
-
- Runtime runtime = Runtime.getRuntime();
- process = runtime.exec(getCmde());
-// ProcessBuilder builder = new ProcessBuilder(cmde);
-// Process process = builder.start();
-
-
-
-
- OutputReader stdOut = new OutputReader(process,Output.stdOut);
- stdOut.start();
- OutputReader stdErr = new OutputReader(process,Output.stdErr);
- stdErr.start();
- int exitValue =process.waitFor();
- stdOut.join();
- stdErr.join();
- result.put("exitValue", exitValue);
- result.put("stdout", stdOut.output.toString());
- result.put("stderr", stdErr.output.toString());
- result.put("status", statusOK);
- }
- catch (Throwable e)
- {
- try
- {
- result.put("status", statusKO);
- result.put("errorLog", e.getMessage());
- }
- catch(Exception e1) { e1.printStackTrace();}
- e.printStackTrace();
- }
+public abstract class ExecPlugin implements IPlugin {
+ public final int statusOK = 100;
+ public final int statusKO = -100;
+
+ Process process;
+
+ public ExecPlugin() {
+
+ }
+
+ public void stop() {
+ process.destroy();
+ }
+
+ public int waitFor() throws InterruptedException {
+ return process.waitFor();
+ }
+
+ public abstract String getCmde();
+
+ public JSONObject postProcess(JSONObject execResult) {
+ return execResult;
+ }
+
+ public JSONObject execute() {
+ JSONObject result = new JSONObject();
+ try {
+ result.put("timestamp", System.currentTimeMillis());
+
+ Runtime runtime = Runtime.getRuntime();
+ process = runtime.exec(getCmde());
+ // ProcessBuilder builder = new ProcessBuilder(cmde);
+ // Process process = builder.start();
+
+ OutputReader stdOut = new OutputReader(process, Output.stdOut);
+ stdOut.start();
+ OutputReader stdErr = new OutputReader(process, Output.stdErr);
+ stdErr.start();
+ int exitValue = process.waitFor();
+ stdOut.join();
+ stdErr.join();
+ result.put("exitValue", exitValue);
+ result.put("stdout", stdOut.output.toString());
+ result.put("stderr", stdErr.output.toString());
+ result.put("status", statusOK);
+ } catch (Throwable e) {
+ try {
+ result.put("status", statusKO);
+ result.put("errorLog", e.getMessage());
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ e.printStackTrace();
+ }
- return postProcess(result);
- }
+ return postProcess(result);
+ }
}
-enum Output{stdOut,stdErr};
-
-class OutputReader extends Thread
-{
- private Process process = null;
- private Output outputType = null;
- public StringBuilder output = new StringBuilder();
- public boolean isOk = true;
-
-
- public OutputReader(Process process,Output outputType)
- {
- this.process = process;
- this.outputType = outputType;
- }
- public void run()
- {
- try
- {
- String line = null;
- InputStream is = null;
- switch(this.outputType)
- {
- case stdOut:
- is = process.getInputStream();
- break;
- case stdErr:
- is = process.getErrorStream();
- break;
-
- }
-
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- while ((line = br.readLine()) != null)
- {
- //System.out.println("========>>>>>>>["+line+"]");
- output.append(line).append("\n");
- }
- br.close();
- }
- catch (IOException e)
- {
- isOk = false;
- e.printStackTrace();
- }
- catch (Throwable e)
- {
- isOk = false;
- e.printStackTrace();
- }
- }
+enum Output {
+ stdOut, stdErr
+};
+
+
+class OutputReader extends Thread {
+ private Process process = null;
+ private Output outputType = null;
+ public StringBuilder output = new StringBuilder();
+ public boolean isOk = true;
+
+ public OutputReader(Process process, Output outputType) {
+ this.process = process;
+ this.outputType = outputType;
+ }
+
+ public void run() {
+ try {
+ String line = null;
+ InputStream is = null;
+ switch (this.outputType) {
+ case stdOut:
+ is = process.getInputStream();
+ break;
+ case stdErr:
+ is = process.getErrorStream();
+ break;
+
+ }
+
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ while ((line = br.readLine()) != null) {
+ // System.out.println("========>>>>>>>["+line+"]");
+ output.append(line).append("\n");
+ }
+ br.close();
+ } catch (IOException e) {
+ isOk = false;
+ e.printStackTrace();
+ } catch (Throwable e) {
+ isOk = false;
+ e.printStackTrace();
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
package org.apache.hadoop.chukwa.inputtools.plugin;
+
import org.json.JSONObject;
-public interface IPlugin
-{
- JSONObject execute();
+public interface IPlugin {
+ JSONObject execute();
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java Wed Mar 11 22:39:26 2009
@@ -1,10 +1,10 @@
package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
@@ -14,54 +14,58 @@
import org.json.JSONObject;
public class Exec extends TimerTask {
- private static Log log = LogFactory.getLog(Exec.class);
- private String cmde = null;
- private static PidFile pFile = null;
- private Timer timer = null;
- private IPlugin plugin = null;
-
- public Exec(String[] cmds) {
- StringBuffer c = new StringBuffer();
- for(String cmd : cmds) {
- c.append(cmd);
- c.append(" ");
- }
- cmde = c.toString();
- plugin = new ExecHelper(cmds);
- }
- public void run() {
- try {
- JSONObject result = plugin.execute();
- if (result.getInt("status") < 0) {
- System.out.println("Error");
- log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
- System.exit(-1);
- } else {
- log.info(result.get("stdout"));
- }
- } catch(JSONException e) {
- log.error("Exec output unparsable:"+this.cmde);
- }
- }
- public String getCmde() {
- return cmde;
- }
-
- public static void main(String[] args) {
- pFile=new PidFile(System.getProperty("RECORD_TYPE")+"-data-loader");
- Runtime.getRuntime().addShutdownHook(pFile);
- int period = 60;
- try {
- if(System.getProperty("PERIOD")!=null) {
- period = Integer.parseInt(System.getProperty("PERIOD"));
- }
- } catch(NumberFormatException ex) {
- ex.printStackTrace();
- System.out.println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
- System.out.println("PERIOD should be numeric format of seconds.");
- System.exit(0);
- }
- Timer timer = new Timer();
- timer.schedule(new Exec(args),0, period*1000);
- }
+ private static Log log = LogFactory.getLog(Exec.class);
+ private String cmde = null;
+ private static PidFile pFile = null;
+ private Timer timer = null;
+ private IPlugin plugin = null;
+
+ public Exec(String[] cmds) {
+ StringBuffer c = new StringBuffer();
+ for (String cmd : cmds) {
+ c.append(cmd);
+ c.append(" ");
+ }
+ cmde = c.toString();
+ plugin = new ExecHelper(cmds);
+ }
+
+ public void run() {
+ try {
+ JSONObject result = plugin.execute();
+ if (result.getInt("status") < 0) {
+ System.out.println("Error");
+ log.warn("[ChukwaError]:" + Exec.class + ", "
+ + result.getString("stderr"));
+ System.exit(-1);
+ } else {
+ log.info(result.get("stdout"));
+ }
+ } catch (JSONException e) {
+ log.error("Exec output unparsable:" + this.cmde);
+ }
+ }
+
+ public String getCmde() {
+ return cmde;
+ }
+
+ public static void main(String[] args) {
+ pFile = new PidFile(System.getProperty("RECORD_TYPE") + "-data-loader");
+ Runtime.getRuntime().addShutdownHook(pFile);
+ int period = 60;
+ try {
+ if (System.getProperty("PERIOD") != null) {
+ period = Integer.parseInt(System.getProperty("PERIOD"));
+ }
+ } catch (NumberFormatException ex) {
+ ex.printStackTrace();
+ System.out
+ .println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
+ System.out.println("PERIOD should be numeric format of seconds.");
+ System.exit(0);
+ }
+ Timer timer = new Timer();
+ timer.schedule(new Exec(args), 0, period * 1000);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java Wed Mar 11 22:39:26 2009
@@ -1,10 +1,10 @@
package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
@@ -14,21 +14,21 @@
import org.json.JSONObject;
public class ExecHelper extends ExecPlugin {
- private static Log log = LogFactory.getLog(ExecHelper.class);
- private String cmde = null;
- private static PidFile pFile = null;
- private Timer timer = null;
-
- public ExecHelper(String[] cmds) {
- StringBuffer c = new StringBuffer();
- for(String cmd : cmds) {
- c.append(cmd);
- c.append(" ");
- }
- cmde = c.toString();
- }
-
- public String getCmde() {
- return cmde;
- }
+ private static Log log = LogFactory.getLog(ExecHelper.class);
+ private String cmde = null;
+ private static PidFile pFile = null;
+ private Timer timer = null;
+
+ public ExecHelper(String[] cmds) {
+ StringBuffer c = new StringBuffer();
+ for (String cmd : cmds) {
+ c.append(cmd);
+ c.append(" ");
+ }
+ cmde = c.toString();
+ }
+
+ public String getCmde() {
+ return cmde;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,105 +18,85 @@
package org.apache.hadoop.chukwa.inputtools.plugin.nodeactivity;
+
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
import org.json.JSONObject;
-public class NodeActivityPlugin extends ExecPlugin
-{
- private String cmde = null;
- private DataConfig dataConfig = null;
-
- public NodeActivityPlugin()
- {
- dataConfig = new DataConfig();
- cmde = dataConfig.get("mdl.plugin.NodeActivityPlugin.cmde");
- }
-
- @Override
- public String getCmde()
- {
- return cmde;
- }
-
- @Override
- public JSONObject postProcess(JSONObject execResult)
- {
- try
- {
- if (execResult.getInt("status") < 0)
- {
- return execResult;
- }
-
- String res = execResult.getString("stdout");
-
- String[] tab = res.split("\n");
- int totalFreeNode = 0;
- int totalUsedNode = 0;
- int totalDownNode = 0;
-
- for(int i=0;i<tab.length;i++)
- {
- if (tab[i].indexOf("state =") <0)
- {
- tab[i] = null;
- continue;
- }
-
- String[] line = tab[i].split("state =");
- tab[i] = null;
-
- if (line[1].trim().equals("free"))
- {
- totalFreeNode ++;
- }
- else if (line[1].trim().equals("job-exclusive"))
- {
- totalUsedNode ++;
- }
- else
- {
- totalDownNode ++;
- }
- }
-
-
- execResult.put("totalFreeNode", totalFreeNode);
- execResult.put("totalUsedNode", totalUsedNode);
- execResult.put("totalDownNode", totalDownNode);
- execResult.put("source", "NodeActivity");
-
- execResult.put("status", 100);
-
- } catch (Throwable e)
- {
- try
- {
- execResult.put("source", "NodeActivity");
- execResult.put("status", -100);
- execResult.put("errorLog",e.getMessage());
- }
- catch(Exception e1) { e1.printStackTrace();}
- e.printStackTrace();
-
- }
-
- return execResult;
- }
-
- public static void main(String[] args)
- {
- IPlugin plugin = new NodeActivityPlugin();
- JSONObject result = plugin.execute();
- System.out.print("Result: " + result);
-
-
-
-
- }
-
+public class NodeActivityPlugin extends ExecPlugin {
+ private String cmde = null;
+ private DataConfig dataConfig = null;
+
+ public NodeActivityPlugin() {
+ dataConfig = new DataConfig();
+ cmde = dataConfig.get("mdl.plugin.NodeActivityPlugin.cmde");
+ }
+
+ @Override
+ public String getCmde() {
+ return cmde;
+ }
+
+ @Override
+ public JSONObject postProcess(JSONObject execResult) {
+ try {
+ if (execResult.getInt("status") < 0) {
+ return execResult;
+ }
+
+ String res = execResult.getString("stdout");
+
+ String[] tab = res.split("\n");
+ int totalFreeNode = 0;
+ int totalUsedNode = 0;
+ int totalDownNode = 0;
+
+ for (int i = 0; i < tab.length; i++) {
+ if (tab[i].indexOf("state =") < 0) {
+ tab[i] = null;
+ continue;
+ }
+
+ String[] line = tab[i].split("state =");
+ tab[i] = null;
+
+ if (line[1].trim().equals("free")) {
+ totalFreeNode++;
+ } else if (line[1].trim().equals("job-exclusive")) {
+ totalUsedNode++;
+ } else {
+ totalDownNode++;
+ }
+ }
+
+ execResult.put("totalFreeNode", totalFreeNode);
+ execResult.put("totalUsedNode", totalUsedNode);
+ execResult.put("totalDownNode", totalDownNode);
+ execResult.put("source", "NodeActivity");
+
+ execResult.put("status", 100);
+
+ } catch (Throwable e) {
+ try {
+ execResult.put("source", "NodeActivity");
+ execResult.put("status", -100);
+ execResult.put("errorLog", e.getMessage());
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ e.printStackTrace();
+
+ }
+
+ return execResult;
+ }
+
+ public static void main(String[] args) {
+ IPlugin plugin = new NodeActivityPlugin();
+ JSONObject result = plugin.execute();
+ System.out.print("Result: " + result);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java Wed Mar 11 22:39:26 2009
@@ -1,5 +1,6 @@
package org.apache.hadoop.chukwa.inputtools.plugin.pbsnode;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
@@ -8,38 +9,32 @@
import org.json.JSONException;
import org.json.JSONObject;
-public class PbsNodePlugin extends ExecPlugin
-{
- private static Log log = LogFactory.getLog(PbsNodePlugin.class);
- private String cmde = null;
- private DataConfig dataConfig = null;
-
- public PbsNodePlugin()
- {
- dataConfig = new DataConfig();
- cmde = dataConfig.get("chukwa.inputtools.plugin.pbsNode.cmde");
- }
-
- @Override
- public String getCmde()
- {
- return cmde;
- }
-
- public static void main(String[] args) throws JSONException
- {
- IPlugin plugin = new PbsNodePlugin();
- JSONObject result = plugin.execute();
- System.out.print("Result: " + result);
-
- if (result.getInt("status") < 0)
- {
- System.out.println("Error");
- log.warn("[ChukwaError]:"+ PbsNodePlugin.class + ", " + result.getString("stderr"));
- }
- else
- {
- log.info(result.get("stdout"));
- }
- }
+public class PbsNodePlugin extends ExecPlugin {
+ private static Log log = LogFactory.getLog(PbsNodePlugin.class);
+ private String cmde = null;
+ private DataConfig dataConfig = null;
+
+ public PbsNodePlugin() {
+ dataConfig = new DataConfig();
+ cmde = dataConfig.get("chukwa.inputtools.plugin.pbsNode.cmde");
+ }
+
+ @Override
+ public String getCmde() {
+ return cmde;
+ }
+
+ public static void main(String[] args) throws JSONException {
+ IPlugin plugin = new PbsNodePlugin();
+ JSONObject result = plugin.execute();
+ System.out.print("Result: " + result);
+
+ if (result.getInt("status") < 0) {
+ System.out.println("Error");
+ log.warn("[ChukwaError]:" + PbsNodePlugin.class + ", "
+ + result.getString("stderr"));
+ } else {
+ log.info(result.get("stdout"));
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java Wed Mar 11 22:39:26 2009
@@ -18,61 +18,61 @@
package org.apache.hadoop.chukwa.util;
+
import java.io.*;
import java.util.*;
public class ClusterConfig {
- public static HashMap<String, String> clusterMap = new HashMap<String, String>();
- private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
- static public String getContents(File aFile) {
- //...checks on aFile are elided
- StringBuffer contents = new StringBuffer();
-
- try {
- //use buffering, reading one line at a time
- //FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
- try {
- String line = null; //not declared within while loop
- /*
- * readLine is a bit quirky :
- * it returns the content of a line MINUS the newline.
- * it returns null only for the END of the stream.
- * it returns an empty String if two newlines appear in a row.
- */
- while (( line = input.readLine()) != null){
- contents.append(line);
- contents.append(System.getProperty("line.separator"));
- }
- } finally {
- input.close();
- }
- }
- catch (IOException ex){
- ex.printStackTrace();
- }
+ public static HashMap<String, String> clusterMap = new HashMap<String, String>();
+ private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator;
- return contents.toString();
- }
-
- public ClusterConfig() {
- File cc = new File(path+"jdbc.conf");
- String buffer = getContents(cc);
- String[] lines = buffer.split("\n");
- for(String line: lines) {
- String[] data = line.split("=",2);
- clusterMap.put(data[0],data[1]);
+ static public String getContents(File aFile) {
+ // ...checks on aFile are elided
+ StringBuffer contents = new StringBuffer();
+
+ try {
+ // use buffering, reading one line at a time
+ // FileReader always assumes default encoding is OK!
+ BufferedReader input = new BufferedReader(new FileReader(aFile));
+ try {
+ String line = null; // not declared within while loop
+ /*
+ * readLine is a bit quirky : it returns the content of a line MINUS the
+ * newline. it returns null only for the END of the stream. it returns
+ * an empty String if two newlines appear in a row.
+ */
+ while ((line = input.readLine()) != null) {
+ contents.append(line);
+ contents.append(System.getProperty("line.separator"));
}
+ } finally {
+ input.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
}
- public String getURL(String cluster) {
- String url = clusterMap.get(cluster);
- return url;
+ return contents.toString();
+ }
+
+ public ClusterConfig() {
+ File cc = new File(path + "jdbc.conf");
+ String buffer = getContents(cc);
+ String[] lines = buffer.split("\n");
+ for (String line : lines) {
+ String[] data = line.split("=", 2);
+ clusterMap.put(data[0], data[1]);
}
+ }
- public Iterator<String> getClusters() {
- Set<String> keys = clusterMap.keySet();
- Iterator<String> i = keys.iterator();
- return i;
- }
+ public String getURL(String cluster) {
+ String url = clusterMap.get(cluster);
+ return url;
+ }
+
+ public Iterator<String> getClusters() {
+ Set<String> keys = clusterMap.keySet();
+ Iterator<String> i = keys.iterator();
+ return i;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,70 +18,72 @@
package org.apache.hadoop.chukwa.util;
-import java.util.Random;
+import java.util.Random;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.*;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
-public class ConstRateAdaptor extends Thread implements Adaptor {
+public class ConstRateAdaptor extends Thread implements Adaptor {
+ private static final int SLEEP_VARIANCE = 200;
+ private static final int MIN_SLEEP = 300;
- private static final int SLEEP_VARIANCE = 200;
- private static final int MIN_SLEEP = 300;
-
private String type;
private long offset;
private int bytesPerSec;
private ChunkReceiver dest;
private long adaptorID;
-
+
private volatile boolean stopping = false;
+
public String getCurrentStatus() throws AdaptorException {
- return type.trim() + " " + bytesPerSec + " " + offset;
+ return type.trim() + " " + bytesPerSec + " " + offset;
}
- public void start(long adaptor, String type, String bytesPerSecParam, long offset, ChunkReceiver dest) throws AdaptorException
- {
- try{
+ public void start(long adaptor, String type, String bytesPerSecParam,
+ long offset, ChunkReceiver dest) throws AdaptorException {
+ try {
bytesPerSec = Integer.parseInt(bytesPerSecParam.trim());
- } catch(NumberFormatException e) {
- throw new AdaptorException("bad argument to const rate adaptor: [" + bytesPerSecParam + "]");
+ } catch (NumberFormatException e) {
+ throw new AdaptorException("bad argument to const rate adaptor: ["
+ + bytesPerSecParam + "]");
}
this.adaptorID = adaptor;
this.offset = offset;
this.type = type;
this.dest = dest;
this.setName("ConstRate Adaptor_" + type);
- super.start(); //this is a Thread.start
+ super.start(); // this is a Thread.start
}
-
+
public String getStreamName() {
return this.type;
}
-
- public void run()
- {
+
+ public void run() {
Random r = new Random();
- try{
- while(!stopping) {
- int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; //between 1 and 3 secs
- //FIXME: I think there's still a risk of integer overflow here
+ try {
+ while (!stopping) {
+ int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
+ // 3 secs
+ // FIXME: I think there's still a risk of integer overflow here
int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
- byte[] data = new byte[ arraySize];
+ byte[] data = new byte[arraySize];
r.nextBytes(data);
offset += data.length;
- ChunkImpl evt = new ChunkImpl(type,"random data source", offset, data , this);
+ ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
+ this);
dest.add(evt);
-
+
Thread.sleep(MSToSleep);
- } //end while
- } catch(InterruptedException ie)
- {} //abort silently
+ } // end while
+ } catch (InterruptedException ie) {
+ } // abort silently
}
-
+
public String toString() {
return "const rate " + type;
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java Wed Mar 11 22:39:26 2009
@@ -18,166 +18,172 @@
package org.apache.hadoop.chukwa.util;
+
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
public class DatabaseWriter {
- private static Log log = LogFactory.getLog(DatabaseWriter.class);
- private Connection conn = null;
- private Statement stmt = null;
- private ResultSet rs = null;
-
- public DatabaseWriter(String host, String user, String password) {
- DataConfig mdlConfig = new DataConfig();
- String jdbc_url = "jdbc:mysql://"+host+"/";
- if(user!=null) {
- jdbc_url = jdbc_url + "?user=" + user;
- if(password!=null) {
- jdbc_url = jdbc_url + "&password=" + password;
- }
- }
- try {
- // The newInstance() call is a work around for some
- // broken Java implementations
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- } catch (Exception ex) {
- // handle the error
- log.error(ex,ex);
- }
- try {
- conn = DriverManager.getConnection(jdbc_url);
- log.debug("Initialized JDBC URL: "+jdbc_url);
- } catch (SQLException ex) {
- log.error(ex,ex);
- }
- }
+ private static Log log = LogFactory.getLog(DatabaseWriter.class);
+ private Connection conn = null;
+ private Statement stmt = null;
+ private ResultSet rs = null;
+
+ public DatabaseWriter(String host, String user, String password) {
+ DataConfig mdlConfig = new DataConfig();
+ String jdbc_url = "jdbc:mysql://" + host + "/";
+ if (user != null) {
+ jdbc_url = jdbc_url + "?user=" + user;
+ if (password != null) {
+ jdbc_url = jdbc_url + "&password=" + password;
+ }
+ }
+ try {
+ // The newInstance() call is a work around for some
+ // broken Java implementations
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ } catch (Exception ex) {
+ // handle the error
+ log.error(ex, ex);
+ }
+ try {
+ conn = DriverManager.getConnection(jdbc_url);
+ log.debug("Initialized JDBC URL: " + jdbc_url);
+ } catch (SQLException ex) {
+ log.error(ex, ex);
+ }
+ }
+
+ public DatabaseWriter(String cluster) {
+ ClusterConfig cc = new ClusterConfig();
+ String jdbc_url = cc.getURL(cluster);
+ try {
+ // The newInstance() call is a work around for some
+ // broken Java implementations
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ } catch (Exception ex) {
+ // handle the error
+ log.error(ex, ex);
+ }
+ try {
+ conn = DriverManager.getConnection(jdbc_url);
+ log.debug("Initialized JDBC URL: " + jdbc_url);
+ } catch (SQLException ex) {
+ log.error(ex, ex);
+ }
+ }
+
+ public DatabaseWriter() {
+ DataConfig mdlConfig = new DataConfig();
+ String jdbc_url = "jdbc:mysql://" + mdlConfig.get("jdbc.host") + "/"
+ + mdlConfig.get("jdbc.db");
+ if (mdlConfig.get("jdbc.user") != null) {
+ jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+ if (mdlConfig.get("jdbc.password") != null) {
+ jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+ }
+ }
+ try {
+ // The newInstance() call is a work around for some
+ // broken Java implementations
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ } catch (Exception ex) {
+ // handle the error
+ log.error(ex, ex);
+ }
+ try {
+ conn = DriverManager.getConnection(jdbc_url);
+ log.debug("Initialized JDBC URL: " + jdbc_url);
+ } catch (SQLException ex) {
+ log.error(ex, ex);
+ }
+ }
+
+ public void execute(String query) {
+ try {
+ stmt = conn.createStatement();
+ stmt.execute(query);
+ } catch (SQLException ex) {
+ // handle any errors
+ log.error(ex, ex);
+ log.error("SQL Statement:" + query);
+ log.error("SQLException: " + ex.getMessage());
+ log.error("SQLState: " + ex.getSQLState());
+ log.error("VendorError: " + ex.getErrorCode());
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException sqlEx) {
+ // ignore
+ }
+ stmt = null;
+ }
+ }
+ }
+
+ public Connection getConnection() {
+ return conn;
+ }
+
+ public ResultSet query(String query) throws SQLException {
+ try {
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(query);
+ } catch (SQLException ex) {
+ // handle any errors
+ log.debug(ex, ex);
+ log.debug("SQL Statement:" + query);
+ log.debug("SQLException: " + ex.getMessage());
+ log.debug("SQLState: " + ex.getSQLState());
+ log.debug("VendorError: " + ex.getErrorCode());
+ throw ex;
+ } finally {
+ }
+ return rs;
+ }
+
+ public void close() {
+ // it is a good idea to release
+ // resources in a finally{} block
+ // in reverse-order of their creation
+ // if they are no-longer needed
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException sqlEx) {
+ // ignore
+ }
+ rs = null;
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException sqlEx) {
+ // ignore
+ }
+ stmt = null;
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sqlEx) {
+ // ignore
+ }
+ conn = null;
+ }
+ }
+
+ public static String formatTimeStamp(long timestamp) {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String format = formatter.format(timestamp);
- public DatabaseWriter(String cluster) {
- ClusterConfig cc = new ClusterConfig();
- String jdbc_url = cc.getURL(cluster);
- try {
- // The newInstance() call is a work around for some
- // broken Java implementations
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- } catch (Exception ex) {
- // handle the error
- log.error(ex,ex);
- }
- try {
- conn = DriverManager.getConnection(jdbc_url);
- log.debug("Initialized JDBC URL: "+jdbc_url);
- } catch (SQLException ex) {
- log.error(ex,ex);
- }
- }
-
- public DatabaseWriter() {
- DataConfig mdlConfig = new DataConfig();
- String jdbc_url = "jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
- if(mdlConfig.get("jdbc.user")!=null) {
- jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
- if(mdlConfig.get("jdbc.password")!=null) {
- jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
- }
- }
- try {
- // The newInstance() call is a work around for some
- // broken Java implementations
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- } catch (Exception ex) {
- // handle the error
- log.error(ex,ex);
- }
- try {
- conn = DriverManager.getConnection(jdbc_url);
- log.debug("Initialized JDBC URL: "+jdbc_url);
- } catch (SQLException ex) {
- log.error(ex,ex);
- }
- }
- public void execute(String query) {
- try {
- stmt = conn.createStatement();
- stmt.execute(query);
- } catch (SQLException ex) {
- // handle any errors
- log.error(ex, ex);
- log.error("SQL Statement:" + query);
- log.error("SQLException: " + ex.getMessage());
- log.error("SQLState: " + ex.getSQLState());
- log.error("VendorError: " + ex.getErrorCode());
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException sqlEx) {
- // ignore
- }
- stmt = null;
- }
- }
- }
- public Connection getConnection() {
- return conn;
- }
- public ResultSet query(String query) throws SQLException {
- try {
- stmt = conn.createStatement();
- rs = stmt.executeQuery(query);
- } catch (SQLException ex) {
- // handle any errors
- log.debug(ex, ex);
- log.debug("SQL Statement:" + query);
- log.debug("SQLException: " + ex.getMessage());
- log.debug("SQLState: " + ex.getSQLState());
- log.debug("VendorError: " + ex.getErrorCode());
- throw ex;
- } finally {
- }
- return rs;
- }
- public void close() {
- // it is a good idea to release
- // resources in a finally{} block
- // in reverse-order of their creation
- // if they are no-longer needed
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException sqlEx) {
- // ignore
- }
- rs = null;
- }
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException sqlEx) {
- // ignore
- }
- stmt = null;
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sqlEx) {
- // ignore
- }
- conn = null;
- }
- }
- public static String formatTimeStamp(long timestamp) {
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String format = formatter.format(timestamp);
-
- return format;
- }
+ return format;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
package org.apache.hadoop.chukwa.util;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -11,51 +11,43 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-public class DumpArchive
-{
+public class DumpArchive {
- /**
- * @param args
- * @throws URISyntaxException
- * @throws IOException
- */
- public static void main(String[] args) throws IOException, URISyntaxException
- {
- System.out.println("Input file:" + args[0]);
-
- ChukwaConfiguration conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
- SequenceFile.Reader r =
- new SequenceFile.Reader(fs,new Path(args[0]), conf);
-
- ChukwaArchiveKey key = new ChukwaArchiveKey();
- ChunkImpl chunk = ChunkImpl.getBlankChunk();
- try
- {
- while (r.next(key, chunk))
- {
- System.out.println("\nTimePartition: " + key.getTimePartition());
- System.out.println("DataType: " + key.getDataType());
- System.out.println("StreamName: " + key.getStreamName());
- System.out.println("SeqId: " + key.getSeqId());
- System.out.println("\t\t =============== ");
-
- System.out.println("Cluster : " + chunk.getTags());
- System.out.println("DataType : " + chunk.getDataType());
- System.out.println("Source : " + chunk.getSource());
- System.out.println("Application : " + chunk.getApplication());
- System.out.println("SeqID : " + chunk.getSeqID());
- System.out.println("Data : " + new String(chunk.getData()));
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
+ /**
+ * @param args
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException, URISyntaxException {
+ System.out.println("Input file:" + args[0]);
+
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(args[0]), conf);
+
+ ChukwaArchiveKey key = new ChukwaArchiveKey();
+ ChunkImpl chunk = ChunkImpl.getBlankChunk();
+ try {
+ while (r.next(key, chunk)) {
+ System.out.println("\nTimePartition: " + key.getTimePartition());
+ System.out.println("DataType: " + key.getDataType());
+ System.out.println("StreamName: " + key.getStreamName());
+ System.out.println("SeqId: " + key.getSeqId());
+ System.out.println("\t\t =============== ");
+
+ System.out.println("Cluster : " + chunk.getTags());
+ System.out.println("DataType : " + chunk.getDataType());
+ System.out.println("Source : " + chunk.getSource());
+ System.out.println("Application : " + chunk.getApplication());
+ System.out.println("SeqID : " + chunk.getSeqID());
+ System.out.println("Data : " + new String(chunk.getData()));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
package org.apache.hadoop.chukwa.util;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -11,49 +11,40 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-public class DumpDataType
-{
+public class DumpDataType {
- /**
- * @param args
- * @throws URISyntaxException
- * @throws IOException
- */
- public static void main(String[] args) throws IOException, URISyntaxException
- {
- System.err.println("Input file:" + args[0]);
- System.err.println("DataType:" + args[1]);
- System.err.println("Source:" + args[2]);
-
- ChukwaConfiguration conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
- SequenceFile.Reader r =
- new SequenceFile.Reader(fs,new Path(args[0]), conf);
-
- ChukwaArchiveKey key = new ChukwaArchiveKey();
- ChunkImpl chunk = ChunkImpl.getBlankChunk();
- try
- {
- while (r.next(key, chunk))
- {
- if (args[1].equalsIgnoreCase(chunk.getDataType()))
- {
- if (args[2].equalsIgnoreCase("ALL") || args[2].equalsIgnoreCase(chunk.getSource()))
- {
- System.out.print(new String(chunk.getData()));
- }
- }
-
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
+ /**
+ * @param args
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException, URISyntaxException {
+ System.err.println("Input file:" + args[0]);
+ System.err.println("DataType:" + args[1]);
+ System.err.println("Source:" + args[2]);
+
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(args[0]), conf);
+
+ ChukwaArchiveKey key = new ChukwaArchiveKey();
+ ChunkImpl chunk = ChunkImpl.getBlankChunk();
+ try {
+ while (r.next(key, chunk)) {
+ if (args[1].equalsIgnoreCase(chunk.getDataType())) {
+ if (args[2].equalsIgnoreCase("ALL")
+ || args[2].equalsIgnoreCase(chunk.getSource())) {
+ System.out.print(new String(chunk.getData()));
+ }
+ }
+
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
package org.apache.hadoop.chukwa.util;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -11,52 +11,42 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-public class DumpRecord
-{
-
- /**
- * @param args
- * @throws URISyntaxException
- * @throws IOException
- */
- public static void main(String[] args) throws IOException, URISyntaxException
- {
- System.out.println("Input file:" + args[0]);
-
- ChukwaConfiguration conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
- SequenceFile.Reader r =
- new SequenceFile.Reader(fs,new Path(args[0]), conf);
-
- ChukwaRecordKey key = new ChukwaRecordKey();
- ChukwaRecord record = new ChukwaRecord();
- try
- {
- while (r.next(key, record))
- {
- System.out.println("\t ===== KEY ===== ");
-
- System.out.println("DataType: " + key.getReduceType());
- System.out.println("\nKey: " + key.getKey());
- System.out.println("\t ===== Value =====");
-
- String[] fields = record.getFields();
- System.out.println("Timestamp : " + record.getTime());
- for (String field: fields)
- {
- System.out.println("[" +field +"] :" + record.getValue(field));
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
+public class DumpRecord {
+ /**
+ * @param args
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException, URISyntaxException {
+ System.out.println("Input file:" + args[0]);
+
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(args[0]), conf);
+
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+ try {
+ while (r.next(key, record)) {
+ System.out.println("\t ===== KEY ===== ");
+
+ System.out.println("DataType: " + key.getReduceType());
+ System.out.println("\nKey: " + key.getKey());
+ System.out.println("\t ===== Value =====");
+
+ String[] fields = record.getFields();
+ System.out.println("Timestamp : " + record.getTime());
+ for (String field : fields) {
+ System.out.println("[" + field + "] :" + record.getValue(field));
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java Wed Mar 11 22:39:26 2009
@@ -18,15 +18,16 @@
package org.apache.hadoop.chukwa.util;
+
import java.io.PrintWriter;
import java.io.StringWriter;
public class ExceptionUtil {
- public static String getStackTrace(Throwable t) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- t.printStackTrace(pw);
- pw.flush();
- return sw.toString();
- }
+ public static String getStackTrace(Throwable t) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ t.printStackTrace(pw);
+ pw.flush();
+ return sw.toString();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Wed Mar 11 22:39:26 2009
@@ -18,60 +18,59 @@
package org.apache.hadoop.chukwa.util;
-import java.util.Random;
+import java.util.Random;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.*;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
-public class MaxRateSender extends Thread implements Adaptor {
-
+public class MaxRateSender extends Thread implements Adaptor {
public static final int BUFFER_SIZE = 60 * 1024;
public static final String ADAPTOR_NAME = "MaxRateSender";
-
+
private volatile boolean stopping = false;
private long offset;
private String type;
ChunkReceiver dest;
private long adaptorID;
-
+
public String getCurrentStatus() throws AdaptorException {
return "";
}
- public void start(long adaptor, String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
- {
+ public void start(long adaptor, String type, String status, long offset,
+ ChunkReceiver dest) throws AdaptorException {
this.setName("MaxRateSender adaptor");
this.adaptorID = adaptor;
this.offset = offset;
this.type = type;
this.dest = dest;
- super.start(); //this is a Thread.start
+ super.start(); // this is a Thread.start
}
-
+
public String getStreamName() {
- return type;
+ return type;
}
-
- public void run()
- {
+
+ public void run() {
Random r = new Random();
-
- try{
- while(!stopping) {
- byte[] data = new byte[ BUFFER_SIZE];
+
+ try {
+ while (!stopping) {
+ byte[] data = new byte[BUFFER_SIZE];
r.nextBytes(data);
offset += data.length;
- ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data, this);
+ ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
+ this);
dest.add(evt);
-
+
}
- } catch(InterruptedException ie)
- {}
+ } catch (InterruptedException ie) {
+ }
}
-
+
public String toString() {
return ADAPTOR_NAME;
}
@@ -80,11 +79,10 @@
stopping = true;
return offset;
}
-
+
public void hardStop() throws AdaptorException {
stopping = true;
}
-
@Override
public String getType() {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/PidFile.java Wed Mar 11 22:39:26 2009
@@ -18,102 +18,101 @@
package org.apache.hadoop.chukwa.util;
+
import java.io.*;
import java.lang.management.ManagementFactory;
import java.nio.channels.*;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
public class PidFile extends Thread {
-
- String name;
- private static Log log = LogFactory.getLog(PidFile.class);
- private static FileLock lock = null;
- private static FileOutputStream pidFileOutput = null;
-
- public PidFile(String name){
- this.name=name;
- try {
- init();
- } catch(IOException ex) {
- clean();
- System.exit(-1);
- }
- }
-
- public void init() throws IOException{
- String pidLong=ManagementFactory.getRuntimeMXBean().getName();
- String[] items=pidLong.split("@");
- String pid=items[0];
- String chukwaPath=System.getProperty("CHUKWA_HOME");
- StringBuffer pidFilesb=new StringBuffer();
- String pidDir = System.getenv("CHUKWA_PID_DIR");
- if (pidDir == null) {
- pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
- }
- pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
- try{
- File existsFile = new File(pidDir);
- if(!existsFile.exists()) {
- boolean success = (new File(pidDir)).mkdirs();
- if(!success) {
- throw(new IOException());
- }
- }
- File pidFile= new File(pidFilesb.toString());
-
- pidFileOutput= new FileOutputStream(pidFile);
- pidFileOutput.write(pid.getBytes());
- pidFileOutput.flush();
- FileChannel channel = pidFileOutput.getChannel();
- PidFile.lock = channel.tryLock();
- if(PidFile.lock!=null) {
- log.debug("Initlization succeeded...");
- } else {
- throw(new IOException());
- }
- }catch (IOException ex){
- System.out.println("Initializaiton failed: can not write pid file.");
- log.error("Initialization failed...");
- log.error(ex.getMessage());
- System.exit(-1);
- throw ex;
-
- }
-
- }
-
- public void clean(){
- String chukwaPath=System.getenv("CHUKWA_HOME");
- StringBuffer pidFilesb=new StringBuffer();
- String pidDir = System.getenv("CHUKWA_PID_DIR");
- if (pidDir == null) {
- pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
- }
- pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
- String pidFileName=pidFilesb.toString();
-
- File pidFile=new File(pidFileName);
- if (!pidFile.exists()) {
- log.error("Delete pid file, No such file or directory: "+pidFileName);
- } else {
- try {
- lock.release();
- pidFileOutput.close();
- } catch(IOException e) {
- log.error("Unable to release file lock: "+pidFileName);
- }
- }
- boolean result=pidFile.delete();
- if (!result){
- log.error("Delete pid file failed, "+pidFileName);
+ String name;
+ private static Log log = LogFactory.getLog(PidFile.class);
+ private static FileLock lock = null;
+ private static FileOutputStream pidFileOutput = null;
+
+ public PidFile(String name) {
+ this.name = name;
+ try {
+ init();
+ } catch (IOException ex) {
+ clean();
+ System.exit(-1);
+ }
+ }
+
+ public void init() throws IOException {
+ String pidLong = ManagementFactory.getRuntimeMXBean().getName();
+ String[] items = pidLong.split("@");
+ String pid = items[0];
+ String chukwaPath = System.getProperty("CHUKWA_HOME");
+ StringBuffer pidFilesb = new StringBuffer();
+ String pidDir = System.getenv("CHUKWA_PID_DIR");
+ if (pidDir == null) {
+ pidDir = chukwaPath + File.separator + "var" + File.separator + "run";
+ }
+ pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+ try {
+ File existsFile = new File(pidDir);
+ if (!existsFile.exists()) {
+ boolean success = (new File(pidDir)).mkdirs();
+ if (!success) {
+ throw (new IOException());
}
- }
+ }
+ File pidFile = new File(pidFilesb.toString());
- public void run() {
- clean();
- }
+ pidFileOutput = new FileOutputStream(pidFile);
+ pidFileOutput.write(pid.getBytes());
+ pidFileOutput.flush();
+ FileChannel channel = pidFileOutput.getChannel();
+ PidFile.lock = channel.tryLock();
+ if (PidFile.lock != null) {
+ log.debug("Initlization succeeded...");
+ } else {
+ throw (new IOException());
+ }
+ } catch (IOException ex) {
+ System.out.println("Initializaiton failed: can not write pid file.");
+ log.error("Initialization failed...");
+ log.error(ex.getMessage());
+ System.exit(-1);
+ throw ex;
+
+ }
+
+ }
+
+ public void clean() {
+ String chukwaPath = System.getenv("CHUKWA_HOME");
+ StringBuffer pidFilesb = new StringBuffer();
+ String pidDir = System.getenv("CHUKWA_PID_DIR");
+ if (pidDir == null) {
+ pidDir = chukwaPath + File.separator + "var" + File.separator + "run";
+ }
+ pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+ String pidFileName = pidFilesb.toString();
+
+ File pidFile = new File(pidFileName);
+ if (!pidFile.exists()) {
+ log.error("Delete pid file, No such file or directory: " + pidFileName);
+ } else {
+ try {
+ lock.release();
+ pidFileOutput.close();
+ } catch (IOException e) {
+ log.error("Unable to release file lock: " + pidFileName);
+ }
+ }
+
+ boolean result = pidFile.delete();
+ if (!result) {
+ log.error("Delete pid file failed, " + pidFileName);
+ }
+ }
+
+ public void run() {
+ clean();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java Wed Mar 11 22:39:26 2009
@@ -18,47 +18,90 @@
package org.apache.hadoop.chukwa.util;
-public class RecordConstants
-{
- static final char[] CTRL_A = {'\u0001'};
- static final char[] CTRL_B = {'\u0002'};
- static final char[] CTRL_C = {'\u0003'};
- static final char[] CTRL_D = {'\u0004'};
- //public static final String FIELD_SEPARATOR = new String(CTRL_A);
- public static final String DEFAULT_FIELD_SEPARATOR = "-#-";
- public static final String DEFAULT_RECORD_SEPARATOR = "\n";
- public static final String RECORD_SEPARATOR_ESCAPE_SEQ = new String (CTRL_D);// may want this to be very obscure, e.g. new String(CTRL_B) + new String (CTRL_C) + new String (CTRL_D)
-
- /**
- * Insert the default chukwa escape sequence in <code>record</code> before all occurances of
- * <code>recordSeparator</code> <i>except</i> the final one if the final record separator occurs
- * at the end of the <code>record</code>
- * @param recordSeparator The record separator that we are escaping. This is chunk source application specific
- * @param record The string representing the entire record, including the final record delimiter
- * @return The string with appropriate <code>recordSeparator</code>s escaped
- */
- public static String escapeAllButLastRecordSeparator(String recordSeparator,String record){
- String escapedRecord = "";
- if (record.endsWith(recordSeparator)){
- escapedRecord = record.substring(0,record.length()-recordSeparator.length()).replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator) + recordSeparator;
- }
- return escapedRecord;
- }
-
+
+public class RecordConstants {
+ static final char[] CTRL_A = { '\u0001' };
+ static final char[] CTRL_B = { '\u0002' };
+ static final char[] CTRL_C = { '\u0003' };
+ static final char[] CTRL_D = { '\u0004' };
+ // public static final String FIELD_SEPARATOR = new String(CTRL_A);
+ public static final String DEFAULT_FIELD_SEPARATOR = "-#-";
+ public static final String DEFAULT_RECORD_SEPARATOR = "\n";
+ public static final String RECORD_SEPARATOR_ESCAPE_SEQ = new String(CTRL_D);// may
+ // want
+ // this
+ // to
+ // be
+ // very
+ // obscure
+ // ,
+ // e
+ // .
+ // g
+ // .
+ // new
+ // String
+ // (
+ // CTRL_B
+ // )
+ // +
+ // new
+ // String
+ // (
+ // CTRL_C
+ // )
+ // +
+ // new
+ // String
+ // (
+ // CTRL_D
+ // )
+
/**
- * Insert the default chukwa escape sequence in <code>record</code> before all occurances of
- * <code>recordSeparator</code>. This is assuming that you are not passing the final record
- * separator in with the <code>record</code>, because it would be escaped too.
- * @param recordSeparator The record separator that we are escaping. This is chunk source application specific
- * @param record The string representing the entire record, including the final record delimiter
+ * Insert the default chukwa escape sequence in <code>record</code> before all
+ * occurances of <code>recordSeparator</code> <i>except</i> the final one if
+ * the final record separator occurs at the end of the <code>record</code>
+ *
+ * @param recordSeparator The record separator that we are escaping. This is
+ * chunk source application specific
+ * @param record The string representing the entire record, including the
+ * final record delimiter
+ * @return The string with appropriate <code>recordSeparator</code>s escaped
+ */
+ public static String escapeAllButLastRecordSeparator(String recordSeparator,
+ String record) {
+ String escapedRecord = "";
+ if (record.endsWith(recordSeparator)) {
+ escapedRecord = record.substring(0,
+ record.length() - recordSeparator.length()).replaceAll(
+ recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator)
+ + recordSeparator;
+ }
+ return escapedRecord;
+ }
+
+ /**
+ * Insert the default chukwa escape sequence in <code>record</code> before all
+ * occurances of <code>recordSeparator</code>. This is assuming that you are
+ * not passing the final record separator in with the <code>record</code>,
+ * because it would be escaped too.
+ *
+ * @param recordSeparator The record separator that we are escaping. This is
+ * chunk source application specific
+ * @param record The string representing the entire record, including the
+ * final record delimiter
* @return The string with all <code>recordSeparator</code>s escaped
*/
- public static String escapeAllRecordSeparators(String recordSeparator,String record){
- return record.replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator);
- }
-
- public static String recoverRecordSeparators(String recordSeparator, String record){
- return record.replaceAll(RECORD_SEPARATOR_ESCAPE_SEQ+recordSeparator, recordSeparator);
+ public static String escapeAllRecordSeparators(String recordSeparator,
+ String record) {
+ return record.replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ
+ + recordSeparator);
+ }
+
+ public static String recoverRecordSeparators(String recordSeparator,
+ String record) {
+ return record.replaceAll(RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator,
+ recordSeparator);
}
-
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
package org.apache.hadoop.mapred;
-import java.util.HashMap;
+import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
@@ -30,75 +30,85 @@
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.fs.Path;
-public class ChukwaJobTrackerInstrumentation extends org.apache.hadoop.mapred.JobTrackerInstrumentation {
+public class ChukwaJobTrackerInstrumentation extends
+ org.apache.hadoop.mapred.JobTrackerInstrumentation {
- protected final JobTracker tracker;
- private static ChukwaAgentController chukwaClient = null;
- private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
- private static HashMap<JobID, Long> jobConfs = null;
- private static HashMap<JobID, Long> jobHistories = null;
-
- public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
- super(jt,conf);
- tracker = jt;
- if(chukwaClient==null) {
- chukwaClient = new ChukwaAgentController();
- }
- if(jobConfs==null) {
- jobConfs = new HashMap<JobID, Long>();
- }
- if(jobHistories==null) {
- jobHistories = new HashMap<JobID, Long>();
- }
- }
-
- public void launchMap(TaskAttemptID taskAttemptID) {
-
- }
-
- public void completeMap(TaskAttemptID taskAttemptID) {
-
- }
-
- public void launchReduce(TaskAttemptID taskAttemptID) {
-
- }
-
- public void completeReduce(TaskAttemptID taskAttemptID) {
-
- }
-
- public void submitJob(JobConf conf, JobID id) {
- String chukwaJobConf = tracker.getLocalJobFilePath(id);
- try {
- String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
- Path jobHistoryPath = JobHistory.JobInfo.getJobHistoryLogLocation(jobFileName);
- String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
- long adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", "JobConf", "0 "+jobConfPath, 0);
- jobConfs.put(id, adaptorID);
- if(jobHistoryPath.toString().matches("^hdfs://")) {
- adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor", "JobHistory", "0 "+jobHistoryPath.toString(), 0);
- } else {
- adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", "JobHistory", "0 "+jobHistoryPath.toString().substring(5), 0);
- }
- jobHistories.put(id, adaptorID);
- } catch(Exception ex) {
-
- }
+ protected final JobTracker tracker;
+ private static ChukwaAgentController chukwaClient = null;
+ private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
+ private static HashMap<JobID, Long> jobConfs = null;
+ private static HashMap<JobID, Long> jobHistories = null;
+
+ public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+ super(jt, conf);
+ tracker = jt;
+ if (chukwaClient == null) {
+ chukwaClient = new ChukwaAgentController();
+ }
+ if (jobConfs == null) {
+ jobConfs = new HashMap<JobID, Long>();
+ }
+ if (jobHistories == null) {
+ jobHistories = new HashMap<JobID, Long>();
+ }
+ }
+
+ public void launchMap(TaskAttemptID taskAttemptID) {
+
+ }
+
+ public void completeMap(TaskAttemptID taskAttemptID) {
+
+ }
+
+ public void launchReduce(TaskAttemptID taskAttemptID) {
+
+ }
+
+ public void completeReduce(TaskAttemptID taskAttemptID) {
+
+ }
+
+ public void submitJob(JobConf conf, JobID id) {
+ String chukwaJobConf = tracker.getLocalJobFilePath(id);
+ try {
+ String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+ Path jobHistoryPath = JobHistory.JobInfo
+ .getJobHistoryLogLocation(jobFileName);
+ String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
+ long adaptorID = chukwaClient
+ .add(
+ "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
+ "JobConf", "0 " + jobConfPath, 0);
+ jobConfs.put(id, adaptorID);
+ if (jobHistoryPath.toString().matches("^hdfs://")) {
+ adaptorID = chukwaClient.add(
+ "org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor",
+ "JobHistory", "0 " + jobHistoryPath.toString(), 0);
+ } else {
+ adaptorID = chukwaClient
+ .add(
+ "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
+ "JobHistory", "0 " + jobHistoryPath.toString().substring(5), 0);
}
+ jobHistories.put(id, adaptorID);
+ } catch (Exception ex) {
+
+ }
+ }
- public void completeJob(JobConf conf, JobID id) {
- try {
- if (jobHistories.containsKey(id)) {
- chukwaClient.remove(jobHistories.get(id));
- }
- if (jobConfs.containsKey(id)) {
- chukwaClient.remove(jobConfs.get(id));
- }
- } catch(Throwable e) {
- log.warn("could not remove adaptor for this job: " + id.toString(),e);
- e.printStackTrace();
- }
- }
+ public void completeJob(JobConf conf, JobID id) {
+ try {
+ if (jobHistories.containsKey(id)) {
+ chukwaClient.remove(jobHistories.get(id));
+ }
+ if (jobConfs.containsKey(id)) {
+ chukwaClient.remove(jobConfs.get(id));
+ }
+ } catch (Throwable e) {
+ log.warn("could not remove adaptor for this job: " + id.toString(), e);
+ e.printStackTrace();
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/ChunkImplTest.java Wed Mar 11 22:39:26 2009
@@ -1,58 +1,48 @@
package org.apache.hadoop.chukwa;
-import java.io.IOException;
+import java.io.IOException;
import junit.framework.TestCase;
-
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
-public class ChunkImplTest extends TestCase
-{
- public void testVersion()
- {
- ChunkBuilder cb = new ChunkBuilder();
- cb.addRecord("foo".getBytes());
- cb.addRecord("bar".getBytes());
- cb.addRecord("baz".getBytes());
- Chunk c = cb.getChunk();
- DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
- try
- {
- c.write(ob);
- DataInputBuffer ib = new DataInputBuffer();
- ib.reset(ob.getData(), c.getSerializedSizeEstimate());
- int version = ib.readInt();
- assertEquals(version,ChunkImpl.PROTOCOL_VERSION);
- }
- catch (IOException e)
- {
- e.printStackTrace();
- fail("Should nor raise any exception");
- }
- }
+public class ChunkImplTest extends TestCase {
+ public void testVersion() {
+ ChunkBuilder cb = new ChunkBuilder();
+ cb.addRecord("foo".getBytes());
+ cb.addRecord("bar".getBytes());
+ cb.addRecord("baz".getBytes());
+ Chunk c = cb.getChunk();
+ DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
+ try {
+ c.write(ob);
+ DataInputBuffer ib = new DataInputBuffer();
+ ib.reset(ob.getData(), c.getSerializedSizeEstimate());
+ int version = ib.readInt();
+ assertEquals(version, ChunkImpl.PROTOCOL_VERSION);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Should nor raise any exception");
+ }
+ }
- public void testWrongVersion()
- {
- ChunkBuilder cb = new ChunkBuilder();
- cb.addRecord("foo".getBytes());
- cb.addRecord("bar".getBytes());
- cb.addRecord("baz".getBytes());
- Chunk c = cb.getChunk();
- DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
- try
- {
- c.write(ob);
- DataInputBuffer ib = new DataInputBuffer();
- ib.reset(ob.getData(), c.getSerializedSizeEstimate());
- //change current chunkImpl version
- ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION+1;
- ChunkImpl.read(ib);
- fail("Should have raised an IOexception");
- }
- catch (IOException e)
- {
- // right behavior, do nothing
- }
- }
+ public void testWrongVersion() {
+ ChunkBuilder cb = new ChunkBuilder();
+ cb.addRecord("foo".getBytes());
+ cb.addRecord("bar".getBytes());
+ cb.addRecord("baz".getBytes());
+ Chunk c = cb.getChunk();
+ DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
+ try {
+ c.write(ob);
+ DataInputBuffer ib = new DataInputBuffer();
+ ib.reset(ob.getData(), c.getSerializedSizeEstimate());
+ // change current chunkImpl version
+ ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1;
+ ChunkImpl.read(ib);
+ fail("Should have raised an IOexception");
+ } catch (IOException e) {
+ // right behavior, do nothing
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java Wed Mar 11 22:39:26 2009
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.chukwa;
+
import junit.framework.TestCase;
public class TestChunkBuilder extends TestCase {
-
- public void testChunkBuilder()
- {
+ public void testChunkBuilder() {
ChunkBuilder cb = new ChunkBuilder();
cb.addRecord("foo".getBytes());
cb.addRecord("bar".getBytes());
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java Wed Mar 11 22:39:26 2009
@@ -18,17 +18,20 @@
package org.apache.hadoop.chukwa.datacollection;
+
import java.io.*;
import java.util.Random;
public class TempFileUtil {
public static File makeBinary(int length) throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),"chukwaTest");
+ File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
+ "chukwaTest");
FileOutputStream fos = new FileOutputStream(tmpOutput);
Random r = new Random();
- byte[] randomData = new byte[ length];
+ byte[] randomData = new byte[length];
r.nextBytes(randomData);
- randomData[ length-1] = '\n';//need data to end with \n since default tailer uses that
+ randomData[length - 1] = '\n';// need data to end with \n since default
+ // tailer uses that
fos.write(randomData);
fos.flush();
fos.close();
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.adaptor;
+
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
public class ChukwaTestAdaptor implements Adaptor {
@@ -27,11 +28,11 @@
private String params = null;
private long startOffset = 0l;
private ChunkReceiver dest = null;
-
+
@Override
public String getCurrentStatus() throws AdaptorException {
// TODO Auto-generated method stub
- return type+ " "+ params + " "+ startOffset;
+ return type + " " + params + " " + startOffset;
}
@Override
@@ -43,7 +44,7 @@
@Override
public void hardStop() throws AdaptorException {
// TODO Auto-generated method stub
-
+
}
@Override
@@ -60,14 +61,12 @@
this.params = params;
this.startOffset = offset;
this.dest = dest;
- System.out.println("adaptorId [" +adaptorId + "]");
- System.out.println("type [" +type+ "]");
- System.out.println("params [" +params+ "]");
- System.out.println("startOffset [" +startOffset+ "]");
-
-
+ System.out.println("adaptorId [" + adaptorId + "]");
+ System.out.println("type [" + type + "]");
+ System.out.println("params [" + params + "]");
+ System.out.println("startOffset [" + startOffset + "]");
+
}
-
public String getType() {
return type;
@@ -108,5 +107,5 @@
public void setDest(ChunkReceiver dest) {
this.dest = dest;
}
-
+
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java Wed Mar 11 22:39:26 2009
@@ -17,29 +17,31 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor;
-import junit.framework.TestCase;
+import junit.framework.TestCase;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
public class TestExecAdaptor extends TestCase {
-
+
ChunkCatcherConnector chunks;
+
public TestExecAdaptor() {
chunks = new ChunkCatcherConnector();
chunks.start();
}
-
+
public void testWithPs() throws ChukwaAgent.AlreadyRunningException {
try {
- ChukwaAgent agent = new ChukwaAgent();
- agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor ps ps aux 0");
-
+ ChukwaAgent agent = new ChukwaAgent();
+ agent
+ .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor ps ps aux 0");
+
Chunk c = chunks.waitForAChunk();
System.out.println(new String(c.getData()));
- } catch(InterruptedException e) {
-
+ } catch (InterruptedException e) {
+
}
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java Wed Mar 11 22:39:26 2009
@@ -1,12 +1,11 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
-
import junit.framework.TestCase;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
@@ -47,7 +46,8 @@
}
private File makeTestFile(String name, int size) throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"), name);
+ File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
+ name);
FileOutputStream fos = new FileOutputStream(tmpOutput);
PrintWriter pw = new PrintWriter(fos);
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java Wed Mar 11 22:39:26 2009
@@ -1,13 +1,12 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
-
import junit.framework.Assert;
import junit.framework.TestCase;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
@@ -77,8 +76,8 @@
FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
long adaptorId = agent
- .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 "
- + logFile +" 0");
+ .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 "
+ + logFile + " 0");
assertTrue(adaptorId != -1);
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java Wed Mar 11 22:39:26 2009
@@ -1,16 +1,15 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-
import junit.framework.Assert;
import junit.framework.TestCase;