You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [9/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java Wed Mar 11 22:39:26 2009
@@ -18,152 +18,145 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class Sar extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(Sar.class);
- public final String recordType = this.getClass().getName();
-
- private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
- private static Pattern p = null;
-
- private Matcher matcher = null;
- private SimpleDateFormat sdf = null;
-
- public Sar()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- p = Pattern.compile(regex);
- }
-
- @Override
- protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
- log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
- int i = 0;
-
-// String logLevel = null;
-// String className = null;
-
- matcher=p.matcher(recordEntry);
- while (matcher.find())
- {
- log.debug("Sar Processor Matches");
-
- try
- {
- Date d = sdf.parse( matcher.group(1).trim());
-
-// logLevel = matcher.group(2);
-// className = matcher.group(3);
-
- //TODO create a more specific key structure
- // part of ChukwaArchiveKey + record index if needed
- key.setKey("" + d.getTime());
-
- String[] lines = recordEntry.split("\n");
-
-
- String[] headers = null;
- while(i < (lines.length-1) && lines[i+1].indexOf("Average:")<0) {
- // Skip to the average lines
- log.debug("skip:"+lines[i]);
- i++;
- }
- while (i < lines.length)
- {
- ChukwaRecord record = null;
- if(lines[i].equals("")) {
- i++;
- headers = parseHeader(lines[i]);
- i++;
- }
- String data[] = parseData(lines[i]);
-
- //FIXME please validate this
- if(headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
- log.debug("Matched Sar-Network");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
- } else if(headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
- log.debug("Matched Sar-Network");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
- } else if(headers[1].equals("kbmemfree")) {
- log.debug("Matched Sar-Memory");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
- } else if(headers[1].equals("totsck")) {
- log.debug("Matched Sar-NetworkSockets");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
- } else if(headers[1].equals("runq-sz")) {
- log.debug("Matched Sar-LoadAverage");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
- } else {
- log.debug("No match:"+headers[1]+" "+headers[2]);
- }
- if(record!=null) {
- int j=0;
-
- log.debug("Data Length: " + data.length);
- while(j<data.length) {
- log.debug("header:"+headers[j]+" data:"+data[j]);
- if(!headers[j].equals("Average:")) {
- record.add(headers[j],data[j]);
- }
- j++;
- }
-
- output.collect(key, record);
- }
- i++;
- }
- // End of parsing
- } catch (Exception e)
- {
- e.printStackTrace();
- throw e;
- }
- }
- }
-
- public String[] parseHeader(String header) {
- String[] headers = header.split("\\s+");
- return headers;
- }
-
- public String[] parseData(String dataLine) {
- String[] data = dataLine.split("\\s+");
- return data;
- }
-
- public String getDataType() {
- return recordType;
- }
+public class Sar extends AbstractProcessor {
+ static Logger log = Logger.getLogger(Sar.class);
+ public final String recordType = this.getClass().getName();
+
+ private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
+ private static Pattern p = null;
+
+ private Matcher matcher = null;
+ private SimpleDateFormat sdf = null;
+
+ public Sar() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ p = Pattern.compile(regex);
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType()
+ + "]");
+ int i = 0;
+
+ // String logLevel = null;
+ // String className = null;
+
+ matcher = p.matcher(recordEntry);
+ while (matcher.find()) {
+ log.debug("Sar Processor Matches");
+
+ try {
+ Date d = sdf.parse(matcher.group(1).trim());
+
+ // logLevel = matcher.group(2);
+ // className = matcher.group(3);
+
+ // TODO create a more specific key structure
+ // part of ChukwaArchiveKey + record index if needed
+ key.setKey("" + d.getTime());
+
+ String[] lines = recordEntry.split("\n");
+
+ String[] headers = null;
+ while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) {
+ // Skip to the average lines
+ log.debug("skip:" + lines[i]);
+ i++;
+ }
+ while (i < lines.length) {
+ ChukwaRecord record = null;
+ if (lines[i].equals("")) {
+ i++;
+ headers = parseHeader(lines[i]);
+ i++;
+ }
+ String data[] = parseData(lines[i]);
+
+ // FIXME please validate this
+ if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
+ log.debug("Matched Sar-Network");
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
+ log.debug("Matched Sar-Network");
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else if (headers[1].equals("kbmemfree")) {
+ log.debug("Matched Sar-Memory");
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else if (headers[1].equals("totsck")) {
+ log.debug("Matched Sar-NetworkSockets");
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else if (headers[1].equals("runq-sz")) {
+ log.debug("Matched Sar-LoadAverage");
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else {
+ log.debug("No match:" + headers[1] + " " + headers[2]);
+ }
+ if (record != null) {
+ int j = 0;
+
+ log.debug("Data Length: " + data.length);
+ while (j < data.length) {
+ log.debug("header:" + headers[j] + " data:" + data[j]);
+ if (!headers[j].equals("Average:")) {
+ record.add(headers[j], data[j]);
+ }
+ j++;
+ }
+
+ output.collect(key, record);
+ }
+ i++;
+ }
+ // End of parsing
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ public String[] parseHeader(String header) {
+ String[] headers = header.split("\\s+");
+ return headers;
+ }
+
+ public String[] parseData(String dataLine) {
+ String[] data = dataLine.split("\\s+");
+ return data;
+ }
+
+ public String getDataType() {
+ return recordType;
+ }
}
\ No newline at end of file
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java Wed Mar 11 22:39:26 2009
@@ -18,74 +18,64 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class SysLog extends AbstractProcessor
-{
+public class SysLog extends AbstractProcessor {
+
+ static Logger log = Logger.getLogger(SysLog.class);
+ private SimpleDateFormat sdf = null;
- static Logger log = Logger.getLogger(SysLog.class);
- private SimpleDateFormat sdf = null;
+ public SysLog() {
+ sdf = new SimpleDateFormat("MMM d HH:mm:ss");
+ }
- public SysLog()
- {
- sdf = new SimpleDateFormat("MMM d HH:mm:ss");
- }
-
-
@Override
protected void parse(String recordEntry,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable
- {
- try
- {
- String dStr = recordEntry.substring(0, 15);
- int start = 15;
- int idx = recordEntry.indexOf(' ', start);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- String body = recordEntry.substring(idx + 1);
- body.replaceAll("\n", "");
-
- Calendar convertDate = Calendar.getInstance();
- Date d = sdf.parse(dStr);
- int year = convertDate.get(Calendar.YEAR);
- convertDate.setTime(d);
- convertDate.set(Calendar.YEAR, year);
-
- ChukwaRecord record = new ChukwaRecord();
- buildGenericRecord(record,recordEntry,convertDate.getTime().getTime(),"SysLog");
- output.collect(key, record);
- }
- catch (ParseException e)
- {
- e.printStackTrace();
- log.warn("Wrong format in SysLog [" + recordEntry + "]", e);
- throw e;
- }
- catch (IOException e)
- {
- e.printStackTrace();
- log.warn("Unable to collect output in SysLog [" + recordEntry + "]", e);
- throw e;
- }
+ throws Throwable {
+ try {
+ String dStr = recordEntry.substring(0, 15);
+ int start = 15;
+ int idx = recordEntry.indexOf(' ', start);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ String body = recordEntry.substring(idx + 1);
+ body.replaceAll("\n", "");
+
+ Calendar convertDate = Calendar.getInstance();
+ Date d = sdf.parse(dStr);
+ int year = convertDate.get(Calendar.YEAR);
+ convertDate.setTime(d);
+ convertDate.set(Calendar.YEAR, year);
+
+ ChukwaRecord record = new ChukwaRecord();
+ buildGenericRecord(record, recordEntry, convertDate.getTime().getTime(),
+ "SysLog");
+ output.collect(key, record);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in SysLog [" + recordEntry + "]", e);
+ throw e;
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.warn("Unable to collect output in SysLog [" + recordEntry + "]", e);
+ throw e;
+ }
}
-
- public String getDataType()
- {
- return SysLog.class.getName();
- }
+ public String getDataType() {
+ return SysLog.class.getName();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java Wed Mar 11 22:39:26 2009
@@ -18,148 +18,146 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class Top extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(Top.class);
- public final String recordType = this.getClass().getName();
-
- private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
- private static Pattern p = null;
-
- private Matcher matcher = null;
- private SimpleDateFormat sdf = null;
-
- public Top()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- p = Pattern.compile(regex);
- }
-
- @Override
- protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
- log.debug("Top record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-
-
- matcher=p.matcher(recordEntry);
- while (matcher.find())
- {
- log.debug("Top Processor Matches");
-
- try
- {
- Date d = sdf.parse( matcher.group(1).trim());
-
- ChukwaRecord record = new ChukwaRecord();
- String[] lines = recordEntry.split("\n");
- int i = 0;
- if(lines.length<2) {
- return;
- }
- String summaryString = "";
- while(!lines[i].equals("")) {
- summaryString = summaryString + lines[i] + "\n";
- i++;
- }
- i++;
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- parseSummary(record,summaryString);
- this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
- output.collect(key, record);
-
- StringBuffer buffer = new StringBuffer();
- //FIXME please validate this
- while (i < lines.length) {
- record = null;
- buffer.append(lines[i]+"\n");
- i++;
-
- }
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
- //Output Top info to database
- output.collect(key, record);
-
- // End of parsing
- } catch (Exception e)
- {
- e.printStackTrace();
- throw e;
- }
- }
- }
-
- public void parseSummary(ChukwaRecord record,String header) {
- HashMap<String, Object> keyValues = new HashMap<String, Object>();
- String[] headers = header.split("\n");
- Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
- Matcher matcher = p.matcher(headers[0]);
- if(matcher.find()) {
- record.add("uptime",matcher.group(2));
- record.add("users",matcher.group(3));
- }
- p = Pattern.compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
- matcher = p.matcher(headers[1]);
- if(matcher.find()) {
- record.add("tasks_total",matcher.group(1));
- record.add("tasks_running",matcher.group(2));
- record.add("tasks_sleeping",matcher.group(3));
- record.add("tasks_stopped",matcher.group(4));
- record.add("tasks_zombie",matcher.group(5));
- }
- p = Pattern.compile("Cpu\\(s\\):\\s*(.*?)%\\s*us,\\s*(.*?)%\\s*sy,\\s*(.*?)%\\s*ni,\\s*(.*?)%\\s*id,\\s*(.*?)%\\s*wa,\\s*(.*?)%\\s*hi,\\s*(.*?)%\\s*si");
- matcher = p.matcher(headers[2]);
- if(matcher.find()) {
- record.add("cpu_user%",matcher.group(1));
- record.add("cpu_sys%",matcher.group(2));
- record.add("cpu_nice%",matcher.group(3));
- record.add("cpu_wait%",matcher.group(4));
- record.add("cpu_hi%",matcher.group(5));
- record.add("cpu_si%",matcher.group(6));
- }
- p = Pattern.compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
- matcher = p.matcher(headers[3]);
- if(matcher.find()) {
- record.add("mem_total",matcher.group(1));
- record.add("mem_used",matcher.group(2));
- record.add("mem_free",matcher.group(3));
- record.add("mem_buffers",matcher.group(4));
- }
- p = Pattern.compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
- matcher = p.matcher(headers[4]);
- if(matcher.find()) {
- record.add("swap_total",matcher.group(1));
- record.add("swap_used",matcher.group(2));
- record.add("swap_free",matcher.group(3));
- record.add("swap_cached",matcher.group(4));
- }
- Iterator<String> ki = keyValues.keySet().iterator();
- while(ki.hasNext()) {
- String key = ki.next();
- log.debug(key+":"+keyValues.get(key));
- }
- }
-
- public String getDataType() {
- return recordType;
- }
+public class Top extends AbstractProcessor {
+ static Logger log = Logger.getLogger(Top.class);
+ public final String recordType = this.getClass().getName();
+
+ private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
+ private static Pattern p = null;
+
+ private Matcher matcher = null;
+ private SimpleDateFormat sdf = null;
+
+ public Top() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ p = Pattern.compile(regex);
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ log.debug("Top record: [" + recordEntry + "] type[" + chunk.getDataType()
+ + "]");
+
+ matcher = p.matcher(recordEntry);
+ while (matcher.find()) {
+ log.debug("Top Processor Matches");
+
+ try {
+ Date d = sdf.parse(matcher.group(1).trim());
+
+ ChukwaRecord record = new ChukwaRecord();
+ String[] lines = recordEntry.split("\n");
+ int i = 0;
+ if (lines.length < 2) {
+ return;
+ }
+ String summaryString = "";
+ while (!lines[i].equals("")) {
+ summaryString = summaryString + lines[i] + "\n";
+ i++;
+ }
+ i++;
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ parseSummary(record, summaryString);
+ this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ output.collect(key, record);
+
+ StringBuffer buffer = new StringBuffer();
+ // FIXME please validate this
+ while (i < lines.length) {
+ record = null;
+ buffer.append(lines[i] + "\n");
+ i++;
+
+ }
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
+ // Output Top info to database
+ output.collect(key, record);
+
+ // End of parsing
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ public void parseSummary(ChukwaRecord record, String header) {
+ HashMap<String, Object> keyValues = new HashMap<String, Object>();
+ String[] headers = header.split("\n");
+ Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
+ Matcher matcher = p.matcher(headers[0]);
+ if (matcher.find()) {
+ record.add("uptime", matcher.group(2));
+ record.add("users", matcher.group(3));
+ }
+ p = Pattern
+ .compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
+ matcher = p.matcher(headers[1]);
+ if (matcher.find()) {
+ record.add("tasks_total", matcher.group(1));
+ record.add("tasks_running", matcher.group(2));
+ record.add("tasks_sleeping", matcher.group(3));
+ record.add("tasks_stopped", matcher.group(4));
+ record.add("tasks_zombie", matcher.group(5));
+ }
+ p = Pattern
+ .compile("Cpu\\(s\\):\\s*(.*?)%\\s*us,\\s*(.*?)%\\s*sy,\\s*(.*?)%\\s*ni,\\s*(.*?)%\\s*id,\\s*(.*?)%\\s*wa,\\s*(.*?)%\\s*hi,\\s*(.*?)%\\s*si");
+ matcher = p.matcher(headers[2]);
+ if (matcher.find()) {
+ record.add("cpu_user%", matcher.group(1));
+ record.add("cpu_sys%", matcher.group(2));
+ record.add("cpu_nice%", matcher.group(3));
+ record.add("cpu_wait%", matcher.group(4));
+ record.add("cpu_hi%", matcher.group(5));
+ record.add("cpu_si%", matcher.group(6));
+ }
+ p = Pattern
+ .compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
+ matcher = p.matcher(headers[3]);
+ if (matcher.find()) {
+ record.add("mem_total", matcher.group(1));
+ record.add("mem_used", matcher.group(2));
+ record.add("mem_free", matcher.group(3));
+ record.add("mem_buffers", matcher.group(4));
+ }
+ p = Pattern
+ .compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
+ matcher = p.matcher(headers[4]);
+ if (matcher.find()) {
+ record.add("swap_total", matcher.group(1));
+ record.add("swap_used", matcher.group(2));
+ record.add("swap_free", matcher.group(3));
+ record.add("swap_cached", matcher.group(4));
+ }
+ Iterator<String> ki = keyValues.keySet().iterator();
+ while (ki.hasNext()) {
+ String key = ki.next();
+ log.debug(key + ":" + keyValues.get(key));
+ }
+ }
+
+ public String getDataType() {
+ return recordType;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java Wed Mar 11 22:39:26 2009
@@ -18,89 +18,76 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class Torque extends AbstractProcessor
-{
+public class Torque extends AbstractProcessor {
+
+ static Logger log = Logger.getLogger(Torque.class);
+ private SimpleDateFormat sdf = null;
- static Logger log = Logger.getLogger(Torque.class);
- private SimpleDateFormat sdf = null;
+ public Torque() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ }
- public Torque()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
- }
-
-
@Override
protected void parse(String recordEntry,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable
- {
- try
- {
- String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- String body = recordEntry.substring(idx + 1);
- body.replaceAll("\n", "");
- Date d = sdf.parse(dStr);
- String[] kvpairs = body.split(", ");
-
-
- ChukwaRecord record = new ChukwaRecord();
- String kvpair = null;
- String[] halves = null;
- boolean containRecord=false;
- for(int i = 0 ; i < kvpairs.length; ++i)
- {
- kvpair = kvpairs[i];
- if(kvpair.indexOf("=")>=0) {
- halves = kvpair.split("=");
- record.add(halves[0], halves[1]);
- containRecord=true;
- }
- }
- if(record.containsField("Machine")) {
- buildGenericRecord(record,null, d.getTime(), "HodMachine");
- } else {
- buildGenericRecord(record,null, d.getTime(), "HodJob");
- }
- if(containRecord) {
- output.collect(key, record);
- }
- }
- catch (ParseException e)
- {
- e.printStackTrace();
- log.warn("Wrong format in Torque [" + recordEntry + "]", e);
- throw e;
- }
- catch (IOException e)
- {
- e.printStackTrace();
- log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
- throw e;
- }
+ throws Throwable {
+ try {
+ String dStr = recordEntry.substring(0, 23);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ String body = recordEntry.substring(idx + 1);
+ body.replaceAll("\n", "");
+ Date d = sdf.parse(dStr);
+ String[] kvpairs = body.split(", ");
+
+ ChukwaRecord record = new ChukwaRecord();
+ String kvpair = null;
+ String[] halves = null;
+ boolean containRecord = false;
+ for (int i = 0; i < kvpairs.length; ++i) {
+ kvpair = kvpairs[i];
+ if (kvpair.indexOf("=") >= 0) {
+ halves = kvpair.split("=");
+ record.add(halves[0], halves[1]);
+ containRecord = true;
+ }
+ }
+ if (record.containsField("Machine")) {
+ buildGenericRecord(record, null, d.getTime(), "HodMachine");
+ } else {
+ buildGenericRecord(record, null, d.getTime(), "HodJob");
+ }
+ if (containRecord) {
+ output.collect(key, record);
+ }
+ } catch (ParseException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in Torque [" + recordEntry + "]", e);
+ throw e;
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
+ throw e;
+ }
}
-
- public String getDataType()
- {
- return Torque.class.getName();
- }
+ public String getDataType() {
+ return Torque.class.getName();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,56 +1,48 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class TsProcessor extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(TsProcessor.class);
- private SimpleDateFormat sdf = null;
-
- public TsProcessor()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
- try
- {
- String dStr = recordEntry.substring(0, 23);
- Date d = sdf.parse(dStr);
- ChukwaRecord record = new ChukwaRecord();
- this.buildGenericRecord(record, recordEntry, d.getTime(), chunk.getDataType());
- output.collect(key, record);
- }
- catch (ParseException e)
- {
- log.warn("Unable to parse the date in DefaultProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in DefaultProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
-
- }
+public class TsProcessor extends AbstractProcessor {
+ static Logger log = Logger.getLogger(TsProcessor.class);
+ private SimpleDateFormat sdf = null;
+
+ public TsProcessor() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ try {
+ String dStr = recordEntry.substring(0, 23);
+ Date d = sdf.parse(dStr);
+ ChukwaRecord record = new ChukwaRecord();
+ this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
+ .getDataType());
+ output.collect(key, record);
+ } catch (ParseException e) {
+ log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
+ + "]", e);
+ e.printStackTrace();
+ throw e;
+ } catch (IOException e) {
+ log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
+ + "]", e);
+ e.printStackTrace();
+ throw e;
+ }
+
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,27 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-public class UnknownRecordTypeException extends Exception
-{
- /**
+public class UnknownRecordTypeException extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = 8925135975093252279L;
+ private static final long serialVersionUID = 8925135975093252279L;
- public UnknownRecordTypeException()
- {}
+ public UnknownRecordTypeException() {
+ }
- public UnknownRecordTypeException(String message)
- {
- super(message);
- }
-
- public UnknownRecordTypeException(Throwable cause)
- {
- super(cause);
- }
-
- public UnknownRecordTypeException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public UnknownRecordTypeException(String message) {
+ super(message);
+ }
+
+ public UnknownRecordTypeException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownRecordTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java Wed Mar 11 22:39:26 2009
@@ -17,11 +17,11 @@
*/
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
@@ -30,104 +30,92 @@
import org.json.JSONException;
import org.json.JSONObject;
-public class YWatch extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(YWatch.class);
-
- private static final String ywatchType = "YWatch";
-
- private static String regex= null;
-
- private static Pattern p = null;
-
- private Matcher matcher = null;
-
- public YWatch()
- {
- //TODO move that to config
- regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
- p = Pattern.compile(regex);
- matcher = p.matcher("-");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
- if (log.isDebugEnabled())
- {
- log.debug("YWatchProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
- }
-
- matcher.reset(recordEntry);
- if (matcher.matches())
- {
- log.info("YWatchProcessor Matches");
-
- try
- {
- String body = matcher.group(4);
-
- try
- {
- JSONObject json = new JSONObject(body);
-
- String poller = json.getString("poller");
- String host = json.getString("host");
- String metricName = json.getString("metricName");
-
- // Data
- JSONObject jsonData = json.getJSONObject("data").getJSONObject("data");
-
- String jsonTs = null;
- long ts = Long.parseLong(jsonTs);
-
- String jsonValue = null;
- Iterator<String> it = jsonData.keys();
-
- ChukwaRecord record = null;
-
- while(it.hasNext())
- {
- jsonTs = it.next();
- jsonValue = jsonData.getString(jsonTs);
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null, ts, "Ywatch");
- record.add("poller", poller);
- record.add("host", host);
- record.add("metricName", metricName);
- record.add("value", jsonValue);
- output.collect(key, record);
- log.info("YWatchProcessor output 1 metric");
- }
-
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in YWatchProcessor [" + recordEntry + "]", e);
- e.printStackTrace();
- }
- catch (JSONException e)
- {
- e.printStackTrace();
- log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
- }
-
- }
- catch(Exception e)
- {
- e.printStackTrace();
- throw e;
- }
- }
- }
-
- public String getDataType()
- {
- return YWatch.ywatchType;
- }
+public class YWatch extends AbstractProcessor {
+ static Logger log = Logger.getLogger(YWatch.class);
+
+ private static final String ywatchType = "YWatch";
+
+ private static String regex = null;
+
+ private static Pattern p = null;
+
+ private Matcher matcher = null;
+
+ public YWatch() {
+ // TODO move that to config
+ regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+ p = Pattern.compile(regex);
+ matcher = p.matcher("-");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ if (log.isDebugEnabled()) {
+ log.debug("YWatchProcessor record: [" + recordEntry + "] type["
+ + chunk.getDataType() + "]");
+ }
+
+ matcher.reset(recordEntry);
+ if (matcher.matches()) {
+ log.info("YWatchProcessor Matches");
+
+ try {
+ String body = matcher.group(4);
+
+ try {
+ JSONObject json = new JSONObject(body);
+
+ String poller = json.getString("poller");
+ String host = json.getString("host");
+ String metricName = json.getString("metricName");
+
+ // Data
+ JSONObject jsonData = json.getJSONObject("data")
+ .getJSONObject("data");
+
+ String jsonTs = null;
+ long ts = Long.parseLong(jsonTs);
+
+ String jsonValue = null;
+ Iterator<String> it = jsonData.keys();
+
+ ChukwaRecord record = null;
+
+ while (it.hasNext()) {
+ jsonTs = it.next();
+ jsonValue = jsonData.getString(jsonTs);
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ this.buildGenericRecord(record, null, ts, "Ywatch");
+ record.add("poller", poller);
+ record.add("host", host);
+ record.add("metricName", metricName);
+ record.add("value", jsonValue);
+ output.collect(key, record);
+ log.info("YWatchProcessor output 1 metric");
+ }
+
+ } catch (IOException e) {
+ log.warn("Unable to collect output in YWatchProcessor ["
+ + recordEntry + "]", e);
+ e.printStackTrace();
+ } catch (JSONException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ public String getDataType() {
+ return YWatch.ywatchType;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java Wed Mar 11 22:39:26 2009
@@ -17,31 +17,27 @@
*/
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-public class YwatchInvalidEntry extends Exception
-{
- /**
+public class YwatchInvalidEntry extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = 7074989443687516732L;
+ private static final long serialVersionUID = 7074989443687516732L;
+
+ public YwatchInvalidEntry() {
+ }
- public YwatchInvalidEntry()
- {
- }
-
- public YwatchInvalidEntry(String message)
- {
- super(message);
- }
-
- public YwatchInvalidEntry(Throwable cause)
- {
- super(cause);
- }
-
- public YwatchInvalidEntry(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public YwatchInvalidEntry(String message) {
+ super(message);
+ }
+
+ public YwatchInvalidEntry(Throwable cause) {
+ super(cause);
+ }
+
+ public YwatchInvalidEntry(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java Wed Mar 11 22:39:26 2009
@@ -18,31 +18,27 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
-public class DuplicateReduceProcessorException extends RuntimeException
-{
- /**
+public class DuplicateReduceProcessorException extends RuntimeException {
+
+ /**
*
*/
- private static final long serialVersionUID = 7396161798611603019L;
+ private static final long serialVersionUID = 7396161798611603019L;
+
+ public DuplicateReduceProcessorException() {
+ }
- public DuplicateReduceProcessorException()
- {
- }
-
- public DuplicateReduceProcessorException(String message)
- {
- super(message);
- }
-
- public DuplicateReduceProcessorException(Throwable cause)
- {
- super(cause);
- }
-
- public DuplicateReduceProcessorException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public DuplicateReduceProcessorException(String message) {
+ super(message);
+ }
+
+ public DuplicateReduceProcessorException(Throwable cause) {
+ super(cause);
+ }
+
+ public DuplicateReduceProcessorException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java Wed Mar 11 22:39:26 2009
@@ -1,39 +1,31 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
import java.io.IOException;
import java.util.Iterator;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-public class IdentityReducer implements ReduceProcessor
-{
+public class IdentityReducer implements ReduceProcessor {
- @Override
- public String getDataType()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- {
- while(values.hasNext())
- {
- try
- {
- output.collect(key, values.next());
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
+ @Override
+ public String getDataType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ while (values.hasNext()) {
+ try {
+ output.collect(key, values.next());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
import java.io.IOException;
import java.util.Iterator;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -28,71 +28,55 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class JobLogHistoryReduceProcessor implements ReduceProcessor
-{
- static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
- @Override
- public String getDataType()
- {
- return this.getClass().getName();
- }
-
- @Override
- public void process(ChukwaRecordKey key,
- Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- {
- try
- {
- String action = key.getKey();
- int count = 0;
-
- ChukwaRecord record = null;
- while(values.hasNext())
- {
- record = values.next();
- if (record.containsField("START_TIME"))
- {
- count++;
- }
- else
- {
- count --;
- }
- }
- ChukwaRecordKey newKey = new ChukwaRecordKey();
- newKey.setKey(""+record.getTime());
- newKey.setReduceType("MSSRGraph");
- ChukwaRecord newRecord = new ChukwaRecord();
- newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
- newRecord.setTime(record.getTime());
- newRecord.add("count", "" + count);
- newRecord.add("JOBID", record.getValue("JOBID"));
- if (action.indexOf("JobLogHist/Map/") >= 0)
- {
- newRecord.add("type", "MAP");
- }
- else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0)
- {
- newRecord.add("type", "SHUFFLE");
- }
- else if (action.indexOf("JobLogHist/SORT/") >= 0)
- {
- newRecord.add("type", "SORT");
- }
- else if (action.indexOf("JobLogHist/REDUCE/") >= 0)
- {
- newRecord.add("type", "REDUCE");
- }
-
- output.collect(newKey, newRecord);
- } catch (IOException e)
- {
- log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
- e.printStackTrace();
- }
+public class JobLogHistoryReduceProcessor implements ReduceProcessor {
+ static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
+
+ @Override
+ public String getDataType() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ try {
+ String action = key.getKey();
+ int count = 0;
+
+ ChukwaRecord record = null;
+ while (values.hasNext()) {
+ record = values.next();
+ if (record.containsField("START_TIME")) {
+ count++;
+ } else {
+ count--;
+ }
+ }
+ ChukwaRecordKey newKey = new ChukwaRecordKey();
+ newKey.setKey("" + record.getTime());
+ newKey.setReduceType("MSSRGraph");
+ ChukwaRecord newRecord = new ChukwaRecord();
+ newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+ newRecord.setTime(record.getTime());
+ newRecord.add("count", "" + count);
+ newRecord.add("JOBID", record.getValue("JOBID"));
+ if (action.indexOf("JobLogHist/Map/") >= 0) {
+ newRecord.add("type", "MAP");
+ } else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0) {
+ newRecord.add("type", "SHUFFLE");
+ } else if (action.indexOf("JobLogHist/SORT/") >= 0) {
+ newRecord.add("type", "SORT");
+ } else if (action.indexOf("JobLogHist/REDUCE/") >= 0) {
+ newRecord.add("type", "REDUCE");
+ }
+
+ output.collect(newKey, newRecord);
+ } catch (IOException e) {
+ log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
+ + key + "]", e);
+ e.printStackTrace();
+ }
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -11,71 +11,64 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class MRJobReduceProcessor implements ReduceProcessor
-{
- static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
- @Override
- public String getDataType()
- {
- return MRJobReduceProcessor.class.getName();
- }
-
- @Override
- public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- {
- try
- {
- HashMap<String, String> data = new HashMap<String, String>();
-
- ChukwaRecord record = null;
- String[] fields = null;
- while(values.hasNext())
- {
- record = values.next();
- fields = record.getFields();
- for(String field: fields)
- {
- data.put(field, record.getValue(field));
- }
- }
-
- //Extract initial time: SUBMIT_TIME
- long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
-
- // Extract HodId
- // maybe use a regex to extract this and load it from configuration
- // JOBCONF="/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
- String jobConf = data.get("JOBCONF");
- int idx = jobConf.indexOf("mapredsystem/");
- idx += 13;
- int idx2 = jobConf.indexOf(".", idx);
- data.put("HodId", jobConf.substring(idx, idx2));
-
- ChukwaRecordKey newKey = new ChukwaRecordKey();
- newKey.setKey(""+initTime);
- newKey.setReduceType("MRJob");
-
- ChukwaRecord newRecord = new ChukwaRecord();
- newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
- newRecord.setTime(initTime);
- newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
- Iterator<String> it = data.keySet().iterator();
- while(it.hasNext())
- {
- String field = it.next();
- newRecord.add(field, data.get(field));
- }
-
- output.collect(newKey, newRecord);
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
- e.printStackTrace();
- }
+public class MRJobReduceProcessor implements ReduceProcessor {
+ static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
+
+ @Override
+ public String getDataType() {
+ return MRJobReduceProcessor.class.getName();
+ }
+
+ @Override
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ try {
+ HashMap<String, String> data = new HashMap<String, String>();
+
+ ChukwaRecord record = null;
+ String[] fields = null;
+ while (values.hasNext()) {
+ record = values.next();
+ fields = record.getFields();
+ for (String field : fields) {
+ data.put(field, record.getValue(field));
+ }
+ }
+
+ // Extract initial time: SUBMIT_TIME
+ long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
+
+ // Extract HodId
+ // maybe use a regex to extract this and load it from configuration
+ // JOBCONF=
+ // "/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
+ String jobConf = data.get("JOBCONF");
+ int idx = jobConf.indexOf("mapredsystem/");
+ idx += 13;
+ int idx2 = jobConf.indexOf(".", idx);
+ data.put("HodId", jobConf.substring(idx, idx2));
+
+ ChukwaRecordKey newKey = new ChukwaRecordKey();
+ newKey.setKey("" + initTime);
+ newKey.setReduceType("MRJob");
+
+ ChukwaRecord newRecord = new ChukwaRecord();
+ newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+ newRecord.setTime(initTime);
+ newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+ Iterator<String> it = data.keySet().iterator();
+ while (it.hasNext()) {
+ String field = it.next();
+ newRecord.add(field, data.get(field));
+ }
+
+ output.collect(newKey, newRecord);
+ } catch (IOException e) {
+ log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
+ + key + "]", e);
+ e.printStackTrace();
+ }
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,17 +18,16 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
-import java.util.Iterator;
+import java.util.Iterator;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-public interface ReduceProcessor
-{
- public String getDataType();
- public void process(ChukwaRecordKey key,Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey,
- ChukwaRecord> output, Reporter reporter);
+public interface ReduceProcessor {
+ public String getDataType();
+
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,75 +18,69 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
-import java.util.HashMap;
+import java.util.HashMap;
import org.apache.log4j.Logger;
+public class ReduceProcessorFactory {
+ static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
-
-public class ReduceProcessorFactory
-{
- static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
-
- // TODO
- // add new mapper package at the end.
- // We should have a more generic way to do this.
- // Ex: read from config
- // list of alias
- // and
- // alias -> processor class
-
- // ******** WARNING ********
- // If the ReduceProcessor is not there use Identity instead
-
-
- private static HashMap<String,ReduceProcessor > processors =
- new HashMap<String, ReduceProcessor>(); // registry
-
- private ReduceProcessorFactory()
- {}
-
- public static ReduceProcessor getProcessor(String reduceType)
- throws UnknownReduceTypeException
- {
- String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."+reduceType;
- if (processors.containsKey(reduceType)) {
- return processors.get(reduceType);
- } else {
- ReduceProcessor processor = null;
- try {
- processor = (ReduceProcessor)Class.forName(path).getConstructor().newInstance();
- }
- catch(ClassNotFoundException e)
- {
- // ******** WARNING ********
- // If the ReduceProcessor is not there use Identity instead
- processor = getProcessor("IdentityReducer");
- register(reduceType,processor);
- return processor;
- }
- catch(Exception e) {
- throw new UnknownReduceTypeException("error constructing processor", e);
- }
-
- //TODO using a ThreadSafe/reuse flag to actually decide if we want
- // to reuse the same processor again and again
- register(reduceType,processor);
- return processor;
- }
- }
-
- /** Register a specific parser for a {@link ReduceProcessor}
- * implementation. */
- public static synchronized void register(String reduceType,
- ReduceProcessor processor)
- {
- log.info("register " + processor.getClass().getName() + " for this recordType :" + reduceType);
- if (processors.containsKey(reduceType))
- {
- throw new DuplicateReduceProcessorException("Duplicate processor for recordType:" + reduceType);
- }
- ReduceProcessorFactory.processors.put(reduceType, processor);
- }
+ // TODO
+ // add new mapper package at the end.
+ // We should have a more generic way to do this.
+ // Ex: read from config
+ // list of alias
+ // and
+ // alias -> processor class
+
+ // ******** WARNING ********
+ // If the ReduceProcessor is not there use Identity instead
+
+ private static HashMap<String, ReduceProcessor> processors = new HashMap<String, ReduceProcessor>(); // registry
+
+ private ReduceProcessorFactory() {
+ }
+
+ public static ReduceProcessor getProcessor(String reduceType)
+ throws UnknownReduceTypeException {
+ String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
+ + reduceType;
+ if (processors.containsKey(reduceType)) {
+ return processors.get(reduceType);
+ } else {
+ ReduceProcessor processor = null;
+ try {
+ processor = (ReduceProcessor) Class.forName(path).getConstructor()
+ .newInstance();
+ } catch (ClassNotFoundException e) {
+ // ******** WARNING ********
+ // If the ReduceProcessor is not there use Identity instead
+ processor = getProcessor("IdentityReducer");
+ register(reduceType, processor);
+ return processor;
+ } catch (Exception e) {
+ throw new UnknownReduceTypeException("error constructing processor", e);
+ }
+
+ // TODO using a ThreadSafe/reuse flag to actually decide if we want
+ // to reuse the same processor again and again
+ register(reduceType, processor);
+ return processor;
+ }
+ }
+
+ /**
+ * Register a specific parser for a {@link ReduceProcessor} implementation.
+ */
+ public static synchronized void register(String reduceType,
+ ReduceProcessor processor) {
+ log.info("register " + processor.getClass().getName()
+ + " for this recordType :" + reduceType);
+ if (processors.containsKey(reduceType)) {
+ throw new DuplicateReduceProcessorException(
+ "Duplicate processor for recordType:" + reduceType);
+ }
+ ReduceProcessorFactory.processors.put(reduceType, processor);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java Wed Mar 11 22:39:26 2009
@@ -18,121 +18,137 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
import java.io.IOException;
import java.util.Iterator;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class SystemMetrics implements ReduceProcessor {
- static Logger log = Logger.getLogger(SystemMetrics.class);
- @Override
- public String getDataType() {
- return this.getClass().getName();
- }
-
- @Override
- public void process(ChukwaRecordKey key,
- Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) {
- try {
-
- ChukwaRecord record = null;
- ChukwaRecord newRecord = new ChukwaRecord();
-
- while(values.hasNext()) {
- record = values.next();
- newRecord.setTime(record.getTime());
-
- if(record.containsField("IFACE")) {
- if(record.containsField("rxpck/s")) {
- if (record.containsField("rxbyt/s") && record.containsField("txbyt/s")) {
- double netBusyPcnt=0, netRxByts=0, netTxByts=0, netSpeed=128000000.00;
- netRxByts=Double.parseDouble(record.getValue("rxbyt/s"));
- netTxByts=Double.parseDouble(record.getValue("txbyt/s"));
- netBusyPcnt=(netRxByts/netSpeed*100)+(netTxByts/netSpeed*100);
- record.add(record.getValue("IFACE")+"_busy_pcnt", "" + netBusyPcnt);
- record.add("csource", record.getValue("csource"));
- }
- record.add(record.getValue("IFACE")+".rxbyt/s", record.getValue("rxbyt/s"));
- record.add(record.getValue("IFACE")+".rxpck/s", record.getValue("rxpck/s"));
- record.add(record.getValue("IFACE")+".txbyt/s", record.getValue("txbyt/s"));
- record.add(record.getValue("IFACE")+".txpck/s", record.getValue("txpck/s"));
- record.removeValue("rxbyt/s");
- record.removeValue("rxpck/s");
- record.removeValue("txbyt/s");
- record.removeValue("txpck/s");
- }
- if(record.containsField("rxerr/s")) {
- record.add(record.getValue("IFACE")+".rxerr/s", record.getValue("rxerr/s"));
- record.add(record.getValue("IFACE")+".rxdrop/s", record.getValue("rxdrop/s"));
- record.add(record.getValue("IFACE")+".txerr/s", record.getValue("txerr/s"));
- record.add(record.getValue("IFACE")+".txdrop/s", record.getValue("txdrop/s"));
- record.removeValue("rxerr/s");
- record.removeValue("rxdrop/s");
- record.removeValue("txerr/s");
- record.removeValue("txdrop/s");
- }
- record.removeValue("IFACE");
- }
-
- if(record.containsField("Device:")) {
- record.add(record.getValue("Device:")+".r/s", record.getValue("r/s"));
- record.add(record.getValue("Device:")+".w/s", record.getValue("w/s"));
- record.add(record.getValue("Device:")+".rkB/s", record.getValue("rkB/s"));
- record.add(record.getValue("Device:")+".wkB/s", record.getValue("wkB/s"));
- record.add(record.getValue("Device:")+".%util", record.getValue("%util"));
- record.removeValue("r/s");
- record.removeValue("w/s");
- record.removeValue("rkB/s");
- record.removeValue("wkB/s");
- record.removeValue("%util");
- record.removeValue("Device:");
- }
-
- if (record.containsField("swap_free")) {
- float swapUsedPcnt=0, swapUsed=0, swapTotal=0;
- swapUsed=Long.parseLong(record.getValue("swap_used"));
- swapTotal=Long.parseLong(record.getValue("swap_total"));
- swapUsedPcnt=swapUsed/swapTotal*100;
- record.add("swap_used_pcnt", "" + swapUsedPcnt);
- record.add("csource", record.getValue("csource"));
- }
-
- if (record.containsField("mem_used")) {
- double memUsedPcnt=0, memTotal=0, memUsed=0;
- memTotal=Double.parseDouble(record.getValue("mem_total"));
- memUsed=Double.parseDouble(record.getValue("mem_used"));
- memUsedPcnt=memUsed/memTotal*100;
- record.add("mem_used_pcnt", "" + memUsedPcnt);
- record.add("csource", record.getValue("csource"));
- }
-
- if (record.containsField("mem_buffers")) {
- double memBuffersPcnt=0, memTotal=0, memBuffers=0;
- memTotal=Double.parseDouble(record.getValue("mem_total"));
- memBuffers=Double.parseDouble(record.getValue("mem_buffers"));
- memBuffersPcnt=memBuffers/memTotal*100;
- record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
- record.add("csource", record.getValue("csource"));
- }
-
- // Copy over all fields
- String[] fields = record.getFields();
- for(String f: fields){
- newRecord.add(f, record.getValue(f));
- }
- }
- record.add("capp", "systemMetrics");
- output.collect(key, newRecord);
- } catch (IOException e) {
- log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e);
- e.printStackTrace();
- }
+public class SystemMetrics implements ReduceProcessor {
+ static Logger log = Logger.getLogger(SystemMetrics.class);
+
+ @Override
+ public String getDataType() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ try {
+
+ ChukwaRecord record = null;
+ ChukwaRecord newRecord = new ChukwaRecord();
+
+ while (values.hasNext()) {
+ record = values.next();
+ newRecord.setTime(record.getTime());
+
+ if (record.containsField("IFACE")) {
+ if (record.containsField("rxpck/s")) {
+ if (record.containsField("rxbyt/s")
+ && record.containsField("txbyt/s")) {
+ double netBusyPcnt = 0, netRxByts = 0, netTxByts = 0, netSpeed = 128000000.00;
+ netRxByts = Double.parseDouble(record.getValue("rxbyt/s"));
+ netTxByts = Double.parseDouble(record.getValue("txbyt/s"));
+ netBusyPcnt = (netRxByts / netSpeed * 100)
+ + (netTxByts / netSpeed * 100);
+ record.add(record.getValue("IFACE") + "_busy_pcnt", ""
+ + netBusyPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+ record.add(record.getValue("IFACE") + ".rxbyt/s", record
+ .getValue("rxbyt/s"));
+ record.add(record.getValue("IFACE") + ".rxpck/s", record
+ .getValue("rxpck/s"));
+ record.add(record.getValue("IFACE") + ".txbyt/s", record
+ .getValue("txbyt/s"));
+ record.add(record.getValue("IFACE") + ".txpck/s", record
+ .getValue("txpck/s"));
+ record.removeValue("rxbyt/s");
+ record.removeValue("rxpck/s");
+ record.removeValue("txbyt/s");
+ record.removeValue("txpck/s");
+ }
+ if (record.containsField("rxerr/s")) {
+ record.add(record.getValue("IFACE") + ".rxerr/s", record
+ .getValue("rxerr/s"));
+ record.add(record.getValue("IFACE") + ".rxdrop/s", record
+ .getValue("rxdrop/s"));
+ record.add(record.getValue("IFACE") + ".txerr/s", record
+ .getValue("txerr/s"));
+ record.add(record.getValue("IFACE") + ".txdrop/s", record
+ .getValue("txdrop/s"));
+ record.removeValue("rxerr/s");
+ record.removeValue("rxdrop/s");
+ record.removeValue("txerr/s");
+ record.removeValue("txdrop/s");
+ }
+ record.removeValue("IFACE");
+ }
+
+ if (record.containsField("Device:")) {
+ record.add(record.getValue("Device:") + ".r/s", record
+ .getValue("r/s"));
+ record.add(record.getValue("Device:") + ".w/s", record
+ .getValue("w/s"));
+ record.add(record.getValue("Device:") + ".rkB/s", record
+ .getValue("rkB/s"));
+ record.add(record.getValue("Device:") + ".wkB/s", record
+ .getValue("wkB/s"));
+ record.add(record.getValue("Device:") + ".%util", record
+ .getValue("%util"));
+ record.removeValue("r/s");
+ record.removeValue("w/s");
+ record.removeValue("rkB/s");
+ record.removeValue("wkB/s");
+ record.removeValue("%util");
+ record.removeValue("Device:");
+ }
+
+ if (record.containsField("swap_free")) {
+ float swapUsedPcnt = 0, swapUsed = 0, swapTotal = 0;
+ swapUsed = Long.parseLong(record.getValue("swap_used"));
+ swapTotal = Long.parseLong(record.getValue("swap_total"));
+ swapUsedPcnt = swapUsed / swapTotal * 100;
+ record.add("swap_used_pcnt", "" + swapUsedPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+
+ if (record.containsField("mem_used")) {
+ double memUsedPcnt = 0, memTotal = 0, memUsed = 0;
+ memTotal = Double.parseDouble(record.getValue("mem_total"));
+ memUsed = Double.parseDouble(record.getValue("mem_used"));
+ memUsedPcnt = memUsed / memTotal * 100;
+ record.add("mem_used_pcnt", "" + memUsedPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+
+ if (record.containsField("mem_buffers")) {
+ double memBuffersPcnt = 0, memTotal = 0, memBuffers = 0;
+ memTotal = Double.parseDouble(record.getValue("mem_total"));
+ memBuffers = Double.parseDouble(record.getValue("mem_buffers"));
+ memBuffersPcnt = memBuffers / memTotal * 100;
+ record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+
+ // Copy over all fields
+ String[] fields = record.getFields();
+ for (String f : fields) {
+ newRecord.add(f, record.getValue(f));
+ }
+ }
+ record.add("capp", "systemMetrics");
+ output.collect(key, newRecord);
+ } catch (IOException e) {
+ log.warn("Unable to collect output in SystemMetricsReduceProcessor ["
+ + key + "]", e);
+ e.printStackTrace();
+ }
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java Wed Mar 11 22:39:26 2009
@@ -18,31 +18,27 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
-public class UnknownReduceTypeException extends Exception
-{
- /**
+public class UnknownReduceTypeException extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = 5760553864088487836L;
+ private static final long serialVersionUID = 5760553864088487836L;
+
+ public UnknownReduceTypeException() {
+ }
- public UnknownReduceTypeException()
- {
- }
-
- public UnknownReduceTypeException(String message)
- {
- super(message);
- }
-
- public UnknownReduceTypeException(Throwable cause)
- {
- super(cause);
- }
-
- public UnknownReduceTypeException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public UnknownReduceTypeException(String message) {
+ super(message);
+ }
+
+ public UnknownReduceTypeException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownReduceTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java Wed Mar 11 22:39:26 2009
@@ -18,102 +18,83 @@
package org.apache.hadoop.chukwa.extraction.engine;
+
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
import org.apache.hadoop.record.Buffer;
-public class ChukwaRecord extends ChukwaRecordJT
-implements Record
-{
- public ChukwaRecord()
- {}
-
-
-
- public void add(String key, String value)
- {
- synchronized(this)
- {
- if (this.mapFields == null)
- {
- this.mapFields = new TreeMap<String,org.apache.hadoop.record.Buffer>();
- }
- }
- this.mapFields.put(key, new Buffer(value.getBytes()));
- }
-
- public String[] getFields()
- {
- return this.mapFields.keySet().toArray(new String[0]);
- }
-
- public String getValue(String field)
- {
- if (this.mapFields.containsKey(field))
- {
- return new String(this.mapFields.get(field).get());
- }
- else
- {
- return null;
- }
- }
-
- public boolean containsField(String field)
- {
- return this.mapFields.containsKey(field);
- }
-
- public void removeValue(String field) {
- if(this.mapFields.containsKey(field)) {
- this.mapFields.remove(field);
- }
- }
-
- @Override
- public String toString()
- {
- Set <Map.Entry<String,Buffer>> f = this.mapFields.entrySet();
- Iterator <Map.Entry<String,Buffer>> it = f.iterator();
-
- Map.Entry<String,Buffer> entry = null;
- StringBuilder sb = new StringBuilder();
- sb.append("<event ");
- StringBuilder body = new StringBuilder();
-
- String key = null;
- String val = null;
- boolean hasBody = false;
- String bodyVal = null;
- while (it.hasNext())
- {
- entry = it.next();
- key = entry.getKey().intern();
- val = new String(entry.getValue().get());
- if (key.intern() == Record.bodyField.intern())
- {
- hasBody = true;
- bodyVal = val;
- }
- else
- {
- sb.append(key).append("=\"").append(val).append("\" ");
- body.append(key).append( " = ").append(val).append("<br>");
- }
-
-
- }
- if (hasBody)
- { sb.append(">").append(bodyVal);}
- else
- { sb.append(">").append(body);}
- sb.append("</event>");
-
- return sb.toString();
- }
+public class ChukwaRecord extends ChukwaRecordJT implements Record {
+ public ChukwaRecord() {
+ }
+
+ public void add(String key, String value) {
+ synchronized (this) {
+ if (this.mapFields == null) {
+ this.mapFields = new TreeMap<String, org.apache.hadoop.record.Buffer>();
+ }
+ }
+ this.mapFields.put(key, new Buffer(value.getBytes()));
+ }
+
+ public String[] getFields() {
+ return this.mapFields.keySet().toArray(new String[0]);
+ }
+
+ public String getValue(String field) {
+ if (this.mapFields.containsKey(field)) {
+ return new String(this.mapFields.get(field).get());
+ } else {
+ return null;
+ }
+ }
+
+ public boolean containsField(String field) {
+ return this.mapFields.containsKey(field);
+ }
+
+ public void removeValue(String field) {
+ if (this.mapFields.containsKey(field)) {
+ this.mapFields.remove(field);
+ }
+ }
+
+ @Override
+ public String toString() {
+ Set<Map.Entry<String, Buffer>> f = this.mapFields.entrySet();
+ Iterator<Map.Entry<String, Buffer>> it = f.iterator();
+
+ Map.Entry<String, Buffer> entry = null;
+ StringBuilder sb = new StringBuilder();
+ sb.append("<event ");
+ StringBuilder body = new StringBuilder();
+
+ String key = null;
+ String val = null;
+ boolean hasBody = false;
+ String bodyVal = null;
+ while (it.hasNext()) {
+ entry = it.next();
+ key = entry.getKey().intern();
+ val = new String(entry.getValue().get());
+ if (key.intern() == Record.bodyField.intern()) {
+ hasBody = true;
+ bodyVal = val;
+ } else {
+ sb.append(key).append("=\"").append(val).append("\" ");
+ body.append(key).append(" = ").append(val).append("<br>");
+ }
+
+ }
+ if (hasBody) {
+ sb.append(">").append(bodyVal);
+ } else {
+ sb.append(">").append(body);
+ }
+ sb.append("</event>");
+
+ return sb.toString();
+ }
-
}