You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [9/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java Wed Mar 11 22:39:26 2009
@@ -18,152 +18,145 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class Sar extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(Sar.class);
-	public final String recordType = this.getClass().getName();
-
-	private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
-	private static Pattern p = null;
-	
-	private Matcher matcher = null;
-	private SimpleDateFormat sdf = null;
-
-	public Sar()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-		p = Pattern.compile(regex);
-	}
-
-	@Override
-	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-		
-		log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		int i = 0;
-		
-//		String logLevel = null;
-//		String className = null;
-		
-		matcher=p.matcher(recordEntry);
-		while (matcher.find())
-		{
-			log.debug("Sar Processor Matches");
-			
-			try
-			{
-				Date d = sdf.parse( matcher.group(1).trim());
-				
-//				logLevel = matcher.group(2);
-//				className = matcher.group(3);
-				
-				//TODO create a more specific key structure
-				// part of ChukwaArchiveKey + record index if needed
-				key.setKey("" + d.getTime());
-				
-				String[] lines = recordEntry.split("\n");
-				
-				
-				String[] headers = null;
-				while(i < (lines.length-1) && lines[i+1].indexOf("Average:")<0) {
-					// Skip to the average lines
-					log.debug("skip:"+lines[i]);
-				    i++;
-				}
-				while (i < lines.length)
-				{
-					ChukwaRecord record = null;
-					if(lines[i].equals("")) {
-						i++;
-						headers = parseHeader(lines[i]);
-						i++;
-					}
-					String data[] = parseData(lines[i]);
-					
-					//FIXME please validate this
-					if(headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
-						log.debug("Matched Sar-Network");
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
-					} else if(headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
-						log.debug("Matched Sar-Network");
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
-					} else if(headers[1].equals("kbmemfree")) {
-						log.debug("Matched Sar-Memory");
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
-					} else if(headers[1].equals("totsck")) {
-						log.debug("Matched Sar-NetworkSockets");
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
-					} else if(headers[1].equals("runq-sz")) {
-						log.debug("Matched Sar-LoadAverage");
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
-					} else {
-						log.debug("No match:"+headers[1]+" "+headers[2]);
-					}
-					if(record!=null) {
-						int j=0;
-						
-						log.debug("Data Length: " + data.length);
-	                    while(j<data.length) {
-	                    	log.debug("header:"+headers[j]+" data:"+data[j]);
-	                    	if(!headers[j].equals("Average:")) {
-	                    		record.add(headers[j],data[j]);
-	                    	}
-						    j++;
-	                    }			
-	                   
-						output.collect(key, record);
-					}
-					i++;
-				}
-				// End of parsing
-			} catch (Exception e)
-			{
-				e.printStackTrace();
-				throw e;
-			}
-		}
-	}
-	
-	public String[] parseHeader(String header) {
-		String[] headers = header.split("\\s+");
-		return headers;
-	}
-
-	public String[] parseData(String dataLine) {
-		String[] data = dataLine.split("\\s+");
-		return data;
-	}
-
-	public String getDataType() {
-		return recordType;
-	}
+public class Sar extends AbstractProcessor {
+  static Logger log = Logger.getLogger(Sar.class);
+  public final String recordType = this.getClass().getName();
+
+  private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
+  private static Pattern p = null;
+
+  private Matcher matcher = null;
+  private SimpleDateFormat sdf = null;
+
+  public Sar() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+    p = Pattern.compile(regex);
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType()
+        + "]");
+    int i = 0;
+
+    // String logLevel = null;
+    // String className = null;
+
+    matcher = p.matcher(recordEntry);
+    while (matcher.find()) {
+      log.debug("Sar Processor Matches");
+
+      try {
+        Date d = sdf.parse(matcher.group(1).trim());
+
+        // logLevel = matcher.group(2);
+        // className = matcher.group(3);
+
+        // TODO create a more specific key structure
+        // part of ChukwaArchiveKey + record index if needed
+        key.setKey("" + d.getTime());
+
+        String[] lines = recordEntry.split("\n");
+
+        String[] headers = null;
+        while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) {
+          // Skip to the average lines
+          log.debug("skip:" + lines[i]);
+          i++;
+        }
+        while (i < lines.length) {
+          ChukwaRecord record = null;
+          if (lines[i].equals("")) {
+            i++;
+            headers = parseHeader(lines[i]);
+            i++;
+          }
+          String data[] = parseData(lines[i]);
+
+          // FIXME please validate this
+          if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
+            log.debug("Matched Sar-Network");
+
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
+            log.debug("Matched Sar-Network");
+
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else if (headers[1].equals("kbmemfree")) {
+            log.debug("Matched Sar-Memory");
+
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else if (headers[1].equals("totsck")) {
+            log.debug("Matched Sar-NetworkSockets");
+
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else if (headers[1].equals("runq-sz")) {
+            log.debug("Matched Sar-LoadAverage");
+
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else {
+            log.debug("No match:" + headers[1] + " " + headers[2]);
+          }
+          if (record != null) {
+            int j = 0;
+
+            log.debug("Data Length: " + data.length);
+            while (j < data.length) {
+              log.debug("header:" + headers[j] + " data:" + data[j]);
+              if (!headers[j].equals("Average:")) {
+                record.add(headers[j], data[j]);
+              }
+              j++;
+            }
+
+            output.collect(key, record);
+          }
+          i++;
+        }
+        // End of parsing
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+  }
+
+  public String[] parseHeader(String header) {
+    String[] headers = header.split("\\s+");
+    return headers;
+  }
+
+  public String[] parseData(String dataLine) {
+    String[] data = dataLine.split("\\s+");
+    return data;
+  }
+
+  public String getDataType() {
+    return recordType;
+  }
 }
\ No newline at end of file

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java Wed Mar 11 22:39:26 2009
@@ -18,74 +18,64 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class SysLog extends AbstractProcessor 
-{
+public class SysLog extends AbstractProcessor {
+
+  static Logger log = Logger.getLogger(SysLog.class);
+  private SimpleDateFormat sdf = null;
 
-	static Logger log = Logger.getLogger(SysLog.class);
-	private SimpleDateFormat sdf = null;
+  public SysLog() {
+    sdf = new SimpleDateFormat("MMM d HH:mm:ss");
+  }
 
-	public SysLog()
-	{
-		sdf = new SimpleDateFormat("MMM d HH:mm:ss");
-	}
-	
-	
   @Override
   protected void parse(String recordEntry,
       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-  throws Throwable
-  {
-		try
-		{
-			String dStr = recordEntry.substring(0, 15);
-			int start = 15;
-			int idx = recordEntry.indexOf(' ', start);
-			start = idx + 1;
-			idx = recordEntry.indexOf(' ', start);
-			String body = recordEntry.substring(idx + 1);
-			body.replaceAll("\n", "");
-
-			Calendar convertDate = Calendar.getInstance();
-			Date d = sdf.parse(dStr);
-			int year = convertDate.get(Calendar.YEAR);
-			convertDate.setTime(d);
-			convertDate.set(Calendar.YEAR, year);
-			
-			ChukwaRecord record = new ChukwaRecord();
-			buildGenericRecord(record,recordEntry,convertDate.getTime().getTime(),"SysLog");
-			output.collect(key, record);
-		}
-		catch (ParseException e)
-		{
-			e.printStackTrace();
-			log.warn("Wrong format in SysLog [" + recordEntry + "]", e);
-			throw e;
-		}
-		catch (IOException e)
-		{
-			e.printStackTrace();
-			log.warn("Unable to collect output in SysLog [" + recordEntry + "]", e);
-			throw e;
-		}
+      throws Throwable {
+    try {
+      String dStr = recordEntry.substring(0, 15);
+      int start = 15;
+      int idx = recordEntry.indexOf(' ', start);
+      start = idx + 1;
+      idx = recordEntry.indexOf(' ', start);
+      String body = recordEntry.substring(idx + 1);
+      body.replaceAll("\n", "");
+
+      Calendar convertDate = Calendar.getInstance();
+      Date d = sdf.parse(dStr);
+      int year = convertDate.get(Calendar.YEAR);
+      convertDate.setTime(d);
+      convertDate.set(Calendar.YEAR, year);
+
+      ChukwaRecord record = new ChukwaRecord();
+      buildGenericRecord(record, recordEntry, convertDate.getTime().getTime(),
+          "SysLog");
+      output.collect(key, record);
+    } catch (ParseException e) {
+      e.printStackTrace();
+      log.warn("Wrong format in SysLog [" + recordEntry + "]", e);
+      throw e;
+    } catch (IOException e) {
+      e.printStackTrace();
+      log.warn("Unable to collect output in SysLog [" + recordEntry + "]", e);
+      throw e;
+    }
 
   }
 
-
-	public String getDataType()
-	{
-		return SysLog.class.getName();
-	}
+  public String getDataType() {
+    return SysLog.class.getName();
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java Wed Mar 11 22:39:26 2009
@@ -18,148 +18,146 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class Top extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(Top.class);
-	public final String recordType = this.getClass().getName();
-
-	private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
-	private static Pattern p = null;
-	
-	private Matcher matcher = null;
-	private SimpleDateFormat sdf = null;
-
-	public Top()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-		p = Pattern.compile(regex);
-	}
-
-	@Override
-	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-		
-		log.debug("Top record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		
-		
-		matcher=p.matcher(recordEntry);
-		while (matcher.find())
-		{
-			log.debug("Top Processor Matches");
-			
-			try
-			{
-				Date d = sdf.parse( matcher.group(1).trim());
-
-				ChukwaRecord record = new ChukwaRecord();
-				String[] lines = recordEntry.split("\n");
-				int i = 0;
-				if(lines.length<2) {
-					return;
-				}
-				String summaryString = "";
-				while(!lines[i].equals("")) {
-					summaryString = summaryString + lines[i] + "\n";
-				    i++;
-				}
-				i++;
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				parseSummary(record,summaryString);
-				this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
-				output.collect(key, record);
-				
-				StringBuffer buffer = new StringBuffer();
-				//FIXME please validate this
-				while (i < lines.length) {
-					record = null;
-					buffer.append(lines[i]+"\n");
-					i++;
-					
-				}
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
-				//Output Top info to database
-				output.collect(key, record);
-
-				// End of parsing
-			} catch (Exception e)
-			{
-				e.printStackTrace();
-				throw e;
-			}
-		}
-	}
-	
-	public void parseSummary(ChukwaRecord record,String header) {
-		HashMap<String, Object> keyValues = new HashMap<String, Object>();
-		String[] headers = header.split("\n");
-		Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
-		Matcher matcher = p.matcher(headers[0]);
-		if(matcher.find()) {
-            record.add("uptime",matcher.group(2));
-            record.add("users",matcher.group(3));
-		}
-		p = Pattern.compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
-		matcher = p.matcher(headers[1]);
-		if(matcher.find()) {
-			record.add("tasks_total",matcher.group(1));
-			record.add("tasks_running",matcher.group(2));
-			record.add("tasks_sleeping",matcher.group(3));
-			record.add("tasks_stopped",matcher.group(4));
-			record.add("tasks_zombie",matcher.group(5));
-		}
-		p = Pattern.compile("Cpu\\(s\\):\\s*(.*?)%\\s*us,\\s*(.*?)%\\s*sy,\\s*(.*?)%\\s*ni,\\s*(.*?)%\\s*id,\\s*(.*?)%\\s*wa,\\s*(.*?)%\\s*hi,\\s*(.*?)%\\s*si");
-		matcher = p.matcher(headers[2]);
-		if(matcher.find()) {
-			record.add("cpu_user%",matcher.group(1));
-			record.add("cpu_sys%",matcher.group(2));
-			record.add("cpu_nice%",matcher.group(3));
-			record.add("cpu_wait%",matcher.group(4));
-			record.add("cpu_hi%",matcher.group(5));
-			record.add("cpu_si%",matcher.group(6));
-		}
-		p = Pattern.compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
-		matcher = p.matcher(headers[3]);
-		if(matcher.find()) {
-			record.add("mem_total",matcher.group(1));
-			record.add("mem_used",matcher.group(2));
-			record.add("mem_free",matcher.group(3));
-			record.add("mem_buffers",matcher.group(4));
-		}
-		p = Pattern.compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
-		matcher = p.matcher(headers[4]);
-		if(matcher.find()) {
-			record.add("swap_total",matcher.group(1));
-			record.add("swap_used",matcher.group(2));
-			record.add("swap_free",matcher.group(3));
-			record.add("swap_cached",matcher.group(4));
-		}
-		Iterator<String> ki = keyValues.keySet().iterator();
-		while(ki.hasNext()) {
-			String key = ki.next();
-			log.debug(key+":"+keyValues.get(key));
-		}
-	}
-
-	public String getDataType() {
-		return recordType;
-	}
+public class Top extends AbstractProcessor {
+  static Logger log = Logger.getLogger(Top.class);
+  public final String recordType = this.getClass().getName();
+
+  private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
+  private static Pattern p = null;
+
+  private Matcher matcher = null;
+  private SimpleDateFormat sdf = null;
+
+  public Top() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+    p = Pattern.compile(regex);
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    log.debug("Top record: [" + recordEntry + "] type[" + chunk.getDataType()
+        + "]");
+
+    matcher = p.matcher(recordEntry);
+    while (matcher.find()) {
+      log.debug("Top Processor Matches");
+
+      try {
+        Date d = sdf.parse(matcher.group(1).trim());
+
+        ChukwaRecord record = new ChukwaRecord();
+        String[] lines = recordEntry.split("\n");
+        int i = 0;
+        if (lines.length < 2) {
+          return;
+        }
+        String summaryString = "";
+        while (!lines[i].equals("")) {
+          summaryString = summaryString + lines[i] + "\n";
+          i++;
+        }
+        i++;
+        record = new ChukwaRecord();
+        key = new ChukwaRecordKey();
+        parseSummary(record, summaryString);
+        this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+        output.collect(key, record);
+
+        StringBuffer buffer = new StringBuffer();
+        // FIXME please validate this
+        while (i < lines.length) {
+          record = null;
+          buffer.append(lines[i] + "\n");
+          i++;
+
+        }
+        record = new ChukwaRecord();
+        key = new ChukwaRecordKey();
+        this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
+        // Output Top info to database
+        output.collect(key, record);
+
+        // End of parsing
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+  }
+
+  public void parseSummary(ChukwaRecord record, String header) {
+    HashMap<String, Object> keyValues = new HashMap<String, Object>();
+    String[] headers = header.split("\n");
+    Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
+    Matcher matcher = p.matcher(headers[0]);
+    if (matcher.find()) {
+      record.add("uptime", matcher.group(2));
+      record.add("users", matcher.group(3));
+    }
+    p = Pattern
+        .compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
+    matcher = p.matcher(headers[1]);
+    if (matcher.find()) {
+      record.add("tasks_total", matcher.group(1));
+      record.add("tasks_running", matcher.group(2));
+      record.add("tasks_sleeping", matcher.group(3));
+      record.add("tasks_stopped", matcher.group(4));
+      record.add("tasks_zombie", matcher.group(5));
+    }
+    p = Pattern
+        .compile("Cpu\\(s\\):\\s*(.*?)%\\s*us,\\s*(.*?)%\\s*sy,\\s*(.*?)%\\s*ni,\\s*(.*?)%\\s*id,\\s*(.*?)%\\s*wa,\\s*(.*?)%\\s*hi,\\s*(.*?)%\\s*si");
+    matcher = p.matcher(headers[2]);
+    if (matcher.find()) {
+      record.add("cpu_user%", matcher.group(1));
+      record.add("cpu_sys%", matcher.group(2));
+      record.add("cpu_nice%", matcher.group(3));
+      record.add("cpu_wait%", matcher.group(4));
+      record.add("cpu_hi%", matcher.group(5));
+      record.add("cpu_si%", matcher.group(6));
+    }
+    p = Pattern
+        .compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
+    matcher = p.matcher(headers[3]);
+    if (matcher.find()) {
+      record.add("mem_total", matcher.group(1));
+      record.add("mem_used", matcher.group(2));
+      record.add("mem_free", matcher.group(3));
+      record.add("mem_buffers", matcher.group(4));
+    }
+    p = Pattern
+        .compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
+    matcher = p.matcher(headers[4]);
+    if (matcher.find()) {
+      record.add("swap_total", matcher.group(1));
+      record.add("swap_used", matcher.group(2));
+      record.add("swap_free", matcher.group(3));
+      record.add("swap_cached", matcher.group(4));
+    }
+    Iterator<String> ki = keyValues.keySet().iterator();
+    while (ki.hasNext()) {
+      String key = ki.next();
+      log.debug(key + ":" + keyValues.get(key));
+    }
+  }
+
+  public String getDataType() {
+    return recordType;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java Wed Mar 11 22:39:26 2009
@@ -18,89 +18,76 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class Torque extends AbstractProcessor 
-{
+public class Torque extends AbstractProcessor {
+
+  static Logger log = Logger.getLogger(Torque.class);
+  private SimpleDateFormat sdf = null;
 
-	static Logger log = Logger.getLogger(Torque.class);
-	private SimpleDateFormat sdf = null;
+  public Torque() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+  }
 
-	public Torque()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-	}
-	
-	
   @Override
   protected void parse(String recordEntry,
       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-  throws Throwable
-  {
-		try
-		{
-			String dStr = recordEntry.substring(0, 23);
-			int start = 24;
-			int idx = recordEntry.indexOf(' ', start);
-			start = idx + 1;
-			idx = recordEntry.indexOf(' ', start);
-			String body = recordEntry.substring(idx + 1);
-			body.replaceAll("\n", "");
-			Date d = sdf.parse(dStr);
-			String[] kvpairs = body.split(", ");
-
-			
-			ChukwaRecord record = new ChukwaRecord();
-			String kvpair =  null;
-			String[] halves = null;
-			boolean containRecord=false;
-		    for(int i = 0 ; i < kvpairs.length; ++i) 
-		    {
-		    	kvpair =  kvpairs[i];
-                if(kvpair.indexOf("=")>=0) {
-		    	  halves = kvpair.split("=");
-	    	      record.add(halves[0], halves[1]);
-	    	      containRecord=true;
-                }
-		    }
-		    if(record.containsField("Machine")) {
-		        buildGenericRecord(record,null, d.getTime(), "HodMachine");		    	
-		    } else {
-		        buildGenericRecord(record,null, d.getTime(), "HodJob");
-		    }
-		    if(containRecord) {
-			    output.collect(key, record);
-		    }
-		}
-		catch (ParseException e)
-		{
-			e.printStackTrace();
-			log.warn("Wrong format in Torque [" + recordEntry + "]", e);
-			throw e;
-		}
-		catch (IOException e)
-		{
-			e.printStackTrace();
-			log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
-			throw e;
-		}
+      throws Throwable {
+    try {
+      String dStr = recordEntry.substring(0, 23);
+      int start = 24;
+      int idx = recordEntry.indexOf(' ', start);
+      start = idx + 1;
+      idx = recordEntry.indexOf(' ', start);
+      String body = recordEntry.substring(idx + 1);
+      body.replaceAll("\n", "");
+      Date d = sdf.parse(dStr);
+      String[] kvpairs = body.split(", ");
+
+      ChukwaRecord record = new ChukwaRecord();
+      String kvpair = null;
+      String[] halves = null;
+      boolean containRecord = false;
+      for (int i = 0; i < kvpairs.length; ++i) {
+        kvpair = kvpairs[i];
+        if (kvpair.indexOf("=") >= 0) {
+          halves = kvpair.split("=");
+          record.add(halves[0], halves[1]);
+          containRecord = true;
+        }
+      }
+      if (record.containsField("Machine")) {
+        buildGenericRecord(record, null, d.getTime(), "HodMachine");
+      } else {
+        buildGenericRecord(record, null, d.getTime(), "HodJob");
+      }
+      if (containRecord) {
+        output.collect(key, record);
+      }
+    } catch (ParseException e) {
+      e.printStackTrace();
+      log.warn("Wrong format in Torque [" + recordEntry + "]", e);
+      throw e;
+    } catch (IOException e) {
+      e.printStackTrace();
+      log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
+      throw e;
+    }
 
   }
 
-
-	public String getDataType()
-	{
-		return Torque.class.getName();
-	}
+  public String getDataType() {
+    return Torque.class.getName();
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,56 +1,48 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class TsProcessor extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(TsProcessor.class);
-	private SimpleDateFormat sdf = null;
-	
-	public TsProcessor()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-	}
-
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-		try
-		{
-			String dStr = recordEntry.substring(0, 23);
-			Date d = sdf.parse(dStr);
-			ChukwaRecord record = new ChukwaRecord();
-			this.buildGenericRecord(record, recordEntry, d.getTime(), chunk.getDataType());
-			output.collect(key, record);
-		} 
-		catch (ParseException e)
-		{
-			log.warn("Unable to parse the date in DefaultProcessor ["
-					+ recordEntry + "]", e);
-			e.printStackTrace();
-			throw e;
-		}
-		catch (IOException e)
-		{
-			log.warn("Unable to collect output in DefaultProcessor ["
-					+ recordEntry + "]", e);
-			e.printStackTrace();
-			throw e;
-		}
-			
-	}
+public class TsProcessor extends AbstractProcessor {
+  static Logger log = Logger.getLogger(TsProcessor.class);
+  private SimpleDateFormat sdf = null;
+
+  public TsProcessor() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+    try {
+      String dStr = recordEntry.substring(0, 23);
+      Date d = sdf.parse(dStr);
+      ChukwaRecord record = new ChukwaRecord();
+      this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
+          .getDataType());
+      output.collect(key, record);
+    } catch (ParseException e) {
+      log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
+          + "]", e);
+      e.printStackTrace();
+      throw e;
+    } catch (IOException e) {
+      log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
+          + "]", e);
+      e.printStackTrace();
+      throw e;
+    }
+
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,27 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-public class UnknownRecordTypeException extends Exception
-{
 
-	/**
+public class UnknownRecordTypeException extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 8925135975093252279L;
+  private static final long serialVersionUID = 8925135975093252279L;
 
-	public UnknownRecordTypeException()
-	{}
+  public UnknownRecordTypeException() {
+  }
 
-	public UnknownRecordTypeException(String message)
-	{
-		super(message);
-	}
-
-	public UnknownRecordTypeException(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public UnknownRecordTypeException(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public UnknownRecordTypeException(String message) {
+    super(message);
+  }
+
+  public UnknownRecordTypeException(Throwable cause) {
+    super(cause);
+  }
+
+  public UnknownRecordTypeException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java Wed Mar 11 22:39:26 2009
@@ -17,11 +17,11 @@
  */
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -30,104 +30,92 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
-public class YWatch extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(YWatch.class);
-
-	private static final String ywatchType = "YWatch";
-	
-	private static String regex= null;
-	
-	private static Pattern p = null;
-	
-	private Matcher matcher = null;
-
-	public YWatch()
-	{
-		//TODO move that to config
-		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
-		p = Pattern.compile(regex);
-		matcher = p.matcher("-");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-		if (log.isDebugEnabled())
-		{
-			log.debug("YWatchProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		}
-		
-		matcher.reset(recordEntry);
-		if (matcher.matches())
-		{
-			log.info("YWatchProcessor Matches");
-			
-			try
-			{
-				String body = matcher.group(4);
-				
-				try
-				{
-					JSONObject json = new JSONObject(body);
-
-					String poller = json.getString("poller");
-					String host = json.getString("host");
-					String metricName = json.getString("metricName");
-					
-					// Data
-					JSONObject jsonData = json.getJSONObject("data").getJSONObject("data");
-		
-					String jsonTs = null;
-					long ts = Long.parseLong(jsonTs);
-					
-					String jsonValue = null;
-					Iterator<String> it = jsonData.keys();
-					
-					ChukwaRecord record = null;
-					
-					while(it.hasNext())
-					{
-						jsonTs = it.next();
-						jsonValue = jsonData.getString(jsonTs);
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						this.buildGenericRecord(record, null, ts, "Ywatch");
-						record.add("poller", poller);
-						record.add("host", host);
-						record.add("metricName", metricName);
-						record.add("value", jsonValue);
-						output.collect(key, record);
-						log.info("YWatchProcessor output 1 metric");
-					}
-					
-				} 
-				catch (IOException e)
-				{
-					log.warn("Unable to collect output in YWatchProcessor [" + recordEntry + "]", e);
-					e.printStackTrace();
-				}
-				catch (JSONException e)
-				{
-					e.printStackTrace();
-					log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
-				}
-				
-			}
-			catch(Exception e)
-			{
-				e.printStackTrace();
-				throw e;
-			}
-		}
-	}
-
-	public String getDataType()
-	{
-		return YWatch.ywatchType;
-	}
+public class YWatch extends AbstractProcessor {
+  static Logger log = Logger.getLogger(YWatch.class);
+
+  private static final String ywatchType = "YWatch";
+
+  private static String regex = null;
+
+  private static Pattern p = null;
+
+  private Matcher matcher = null;
+
+  public YWatch() {
+    // TODO move that to config
+    regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+    p = Pattern.compile(regex);
+    matcher = p.matcher("-");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+    if (log.isDebugEnabled()) {
+      log.debug("YWatchProcessor record: [" + recordEntry + "] type["
+          + chunk.getDataType() + "]");
+    }
+
+    matcher.reset(recordEntry);
+    if (matcher.matches()) {
+      log.info("YWatchProcessor Matches");
+
+      try {
+        String body = matcher.group(4);
+
+        try {
+          JSONObject json = new JSONObject(body);
+
+          String poller = json.getString("poller");
+          String host = json.getString("host");
+          String metricName = json.getString("metricName");
+
+          // Data
+          JSONObject jsonData = json.getJSONObject("data")
+              .getJSONObject("data");
+
+          String jsonTs = null;
+          long ts = Long.parseLong(jsonTs);
+
+          String jsonValue = null;
+          Iterator<String> it = jsonData.keys();
+
+          ChukwaRecord record = null;
+
+          while (it.hasNext()) {
+            jsonTs = it.next();
+            jsonValue = jsonData.getString(jsonTs);
+
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            this.buildGenericRecord(record, null, ts, "Ywatch");
+            record.add("poller", poller);
+            record.add("host", host);
+            record.add("metricName", metricName);
+            record.add("value", jsonValue);
+            output.collect(key, record);
+            log.info("YWatchProcessor output 1 metric");
+          }
+
+        } catch (IOException e) {
+          log.warn("Unable to collect output in YWatchProcessor ["
+              + recordEntry + "]", e);
+          e.printStackTrace();
+        } catch (JSONException e) {
+          e.printStackTrace();
+          log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+  }
+
+  public String getDataType() {
+    return YWatch.ywatchType;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java Wed Mar 11 22:39:26 2009
@@ -17,31 +17,27 @@
  */
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-public class YwatchInvalidEntry extends Exception
-{
 
-	/**
+public class YwatchInvalidEntry extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 7074989443687516732L;
+  private static final long serialVersionUID = 7074989443687516732L;
+
+  public YwatchInvalidEntry() {
+  }
 
-	public YwatchInvalidEntry()
-	{
-	}
-
-	public YwatchInvalidEntry(String message)
-	{
-		super(message);
-	}
-
-	public YwatchInvalidEntry(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public YwatchInvalidEntry(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public YwatchInvalidEntry(String message) {
+    super(message);
+  }
+
+  public YwatchInvalidEntry(Throwable cause) {
+    super(cause);
+  }
+
+  public YwatchInvalidEntry(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java Wed Mar 11 22:39:26 2009
@@ -18,31 +18,27 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
-public class DuplicateReduceProcessorException  extends RuntimeException
-{
 
-	/**
+public class DuplicateReduceProcessorException extends RuntimeException {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 7396161798611603019L;
+  private static final long serialVersionUID = 7396161798611603019L;
+
+  public DuplicateReduceProcessorException() {
+  }
 
-	public DuplicateReduceProcessorException()
-	{
-	}
-
-	public DuplicateReduceProcessorException(String message)
-	{
-		super(message);
-	}
-
-	public DuplicateReduceProcessorException(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public DuplicateReduceProcessorException(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public DuplicateReduceProcessorException(String message) {
+    super(message);
+  }
+
+  public DuplicateReduceProcessorException(Throwable cause) {
+    super(cause);
+  }
+
+  public DuplicateReduceProcessorException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java Wed Mar 11 22:39:26 2009
@@ -1,39 +1,31 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
+
 import java.io.IOException;
 import java.util.Iterator;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-public class IdentityReducer implements ReduceProcessor
-{
+public class IdentityReducer implements ReduceProcessor {
 
-	@Override
-	public String getDataType()
-	{
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	{
-		while(values.hasNext())
-		{
-			try
-			{
-				output.collect(key, values.next());
-			} 
-			catch (IOException e)
-			{
-				e.printStackTrace();
-			}
-		}
-	}
+  @Override
+  public String getDataType() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    while (values.hasNext()) {
+      try {
+        output.collect(key, values.next());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
+
 import java.io.IOException;
 import java.util.Iterator;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -28,71 +28,55 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class JobLogHistoryReduceProcessor implements ReduceProcessor
-{
-	static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
-	@Override
-	public String getDataType()
-	{
-		return this.getClass().getName();
-	}
-
-	@Override
-	public void process(ChukwaRecordKey key, 
-						Iterator<ChukwaRecord> values,
-						OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-						Reporter reporter)
-	{
-		try
-		{
-			String action = key.getKey();
-			int count = 0;
-			
-			ChukwaRecord record = null;
-			while(values.hasNext())
-			{
-				record = values.next();
-				if (record.containsField("START_TIME"))
-				{
-					count++;
-				}
-				else
-				{
-					count --;
-				}
-			}
-			ChukwaRecordKey newKey = new ChukwaRecordKey();
-			newKey.setKey(""+record.getTime());
-			newKey.setReduceType("MSSRGraph");
-			ChukwaRecord newRecord = new ChukwaRecord();
-			newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
-			newRecord.setTime(record.getTime());
-			newRecord.add("count", "" + count);
-			newRecord.add("JOBID", record.getValue("JOBID"));
-			if (action.indexOf("JobLogHist/Map/") >= 0)
-			{
-				newRecord.add("type", "MAP");
-			}
-			else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0)
-			{
-				newRecord.add("type", "SHUFFLE");
-			}
-			else if (action.indexOf("JobLogHist/SORT/") >= 0)
-			{
-				newRecord.add("type", "SORT");
-			}
-			else if (action.indexOf("JobLogHist/REDUCE/") >= 0)
-			{
-				newRecord.add("type", "REDUCE");
-			}
-				
-			output.collect(newKey, newRecord);
-		} catch (IOException e)
-		{
-			log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
-			e.printStackTrace();
-		}
+public class JobLogHistoryReduceProcessor implements ReduceProcessor {
+  static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
+
+  @Override
+  public String getDataType() {
+    return this.getClass().getName();
+  }
+
+  @Override
+  public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    try {
+      String action = key.getKey();
+      int count = 0;
+
+      ChukwaRecord record = null;
+      while (values.hasNext()) {
+        record = values.next();
+        if (record.containsField("START_TIME")) {
+          count++;
+        } else {
+          count--;
+        }
+      }
+      ChukwaRecordKey newKey = new ChukwaRecordKey();
+      newKey.setKey("" + record.getTime());
+      newKey.setReduceType("MSSRGraph");
+      ChukwaRecord newRecord = new ChukwaRecord();
+      newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+      newRecord.setTime(record.getTime());
+      newRecord.add("count", "" + count);
+      newRecord.add("JOBID", record.getValue("JOBID"));
+      if (action.indexOf("JobLogHist/Map/") >= 0) {
+        newRecord.add("type", "MAP");
+      } else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0) {
+        newRecord.add("type", "SHUFFLE");
+      } else if (action.indexOf("JobLogHist/SORT/") >= 0) {
+        newRecord.add("type", "SORT");
+      } else if (action.indexOf("JobLogHist/REDUCE/") >= 0) {
+        newRecord.add("type", "REDUCE");
+      }
+
+      output.collect(newKey, newRecord);
+    } catch (IOException e) {
+      log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
+          + key + "]", e);
+      e.printStackTrace();
+    }
 
-	}
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,9 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -11,71 +11,64 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class MRJobReduceProcessor implements ReduceProcessor
-{
-	static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
-	@Override
-	public String getDataType()
-	{
-		return MRJobReduceProcessor.class.getName();
-	}
-
-	@Override
-	public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	{
-		try
-		{
-			HashMap<String, String> data = new HashMap<String, String>();
-			
-			ChukwaRecord record = null;
-			String[] fields = null;
-			while(values.hasNext())
-			{
-				record = values.next();
-				fields = record.getFields();
-				for(String field: fields)
-				{
-					data.put(field, record.getValue(field));
-				}
-			}
-			
-			//Extract initial time: SUBMIT_TIME
-			long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
-			
-			// Extract HodId
-			// maybe use a regex to extract this and load it from configuration
-			// JOBCONF="/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
-			String jobConf = data.get("JOBCONF");
-			int idx = jobConf.indexOf("mapredsystem/");
-			idx += 13;
-			int idx2 = jobConf.indexOf(".", idx);
-			data.put("HodId", jobConf.substring(idx, idx2)); 
-			
-			ChukwaRecordKey newKey = new ChukwaRecordKey();
-			newKey.setKey(""+initTime);
-			newKey.setReduceType("MRJob");
-			
-			ChukwaRecord newRecord = new ChukwaRecord();
-			newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
-			newRecord.setTime(initTime);
-			newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
-			Iterator<String> it = data.keySet().iterator();
-			while(it.hasNext())
-			{
-				String field = it.next();
-				newRecord.add(field, data.get(field));
-			}
-
-			output.collect(newKey, newRecord);
-		}
-		catch (IOException e)
-		{
-			log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
-			e.printStackTrace();
-		}
+public class MRJobReduceProcessor implements ReduceProcessor {
+  static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
+
+  @Override
+  public String getDataType() {
+    return MRJobReduceProcessor.class.getName();
+  }
+
+  @Override
+  public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    try {
+      HashMap<String, String> data = new HashMap<String, String>();
+
+      ChukwaRecord record = null;
+      String[] fields = null;
+      while (values.hasNext()) {
+        record = values.next();
+        fields = record.getFields();
+        for (String field : fields) {
+          data.put(field, record.getValue(field));
+        }
+      }
+
+      // Extract initial time: SUBMIT_TIME
+      long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
+
+      // Extract HodId
+      // maybe use a regex to extract this and load it from configuration
+      // JOBCONF=
+      // "/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
+      String jobConf = data.get("JOBCONF");
+      int idx = jobConf.indexOf("mapredsystem/");
+      idx += 13;
+      int idx2 = jobConf.indexOf(".", idx);
+      data.put("HodId", jobConf.substring(idx, idx2));
+
+      ChukwaRecordKey newKey = new ChukwaRecordKey();
+      newKey.setKey("" + initTime);
+      newKey.setReduceType("MRJob");
+
+      ChukwaRecord newRecord = new ChukwaRecord();
+      newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+      newRecord.setTime(initTime);
+      newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+      Iterator<String> it = data.keySet().iterator();
+      while (it.hasNext()) {
+        String field = it.next();
+        newRecord.add(field, data.get(field));
+      }
+
+      output.collect(newKey, newRecord);
+    } catch (IOException e) {
+      log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
+          + key + "]", e);
+      e.printStackTrace();
+    }
 
-	}
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,17 +18,16 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
-import java.util.Iterator;
 
+import java.util.Iterator;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-public interface ReduceProcessor
-{
-	public String getDataType();
-	public void process(ChukwaRecordKey key,Iterator<ChukwaRecord> values,
-						OutputCollector<ChukwaRecordKey, 
-						ChukwaRecord> output, Reporter reporter);
+public interface ReduceProcessor {
+  public String getDataType();
+
+  public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,75 +18,69 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
-import java.util.HashMap;
 
+import java.util.HashMap;
 import org.apache.log4j.Logger;
 
+public class ReduceProcessorFactory {
+  static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
 
-
-public class ReduceProcessorFactory
-{
-	static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
-	
-	// TODO
-	//	add new mapper package at the end.
-	//	We should have a more generic way to do this.
-	//	Ex: read from config
-	//	list of alias
-	//	and
-	//	alias -> processor class
-	
-	// ******** WARNING ********
-	// If the ReduceProcessor is not there use Identity instead
-	
-	
-	private static HashMap<String,ReduceProcessor > processors =
-	    new HashMap<String, ReduceProcessor>(); // registry
-		
-	private ReduceProcessorFactory()
-	{}
-	
-	public static ReduceProcessor getProcessor(String reduceType)
-	 throws UnknownReduceTypeException
-	{
-		String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."+reduceType;
-		if (processors.containsKey(reduceType)) {
-			return processors.get(reduceType);
-		} else {
-			ReduceProcessor processor = null;
-			try {
-				processor = (ReduceProcessor)Class.forName(path).getConstructor().newInstance();
-			} 
-			catch(ClassNotFoundException e) 
-			{
-				// ******** WARNING ********
-				// If the ReduceProcessor is not there use Identity instead
-				processor = getProcessor("IdentityReducer");
-				register(reduceType,processor);
-				return processor;
-			} 
-			catch(Exception e) {
-			  throw new UnknownReduceTypeException("error constructing processor", e);
-			}
-			
-			//TODO using a ThreadSafe/reuse flag to actually decide if we want 
-			// to reuse the same processor again and again
-			register(reduceType,processor);
-			return processor;
-		}
-	}
-	
-	  /** Register a specific parser for a {@link ReduceProcessor}
-	   * implementation. */
-	  public static synchronized void register(String reduceType,
-	                                         ReduceProcessor processor) 
-	  {
-		  log.info("register " + processor.getClass().getName() + " for this recordType :" + reduceType);
-		  if (processors.containsKey(reduceType))
-			{
-			  throw new DuplicateReduceProcessorException("Duplicate processor for recordType:" + reduceType);
-			}
-		  ReduceProcessorFactory.processors.put(reduceType, processor);
-	  }
+  // TODO
+  // add new mapper package at the end.
+  // We should have a more generic way to do this.
+  // Ex: read from config
+  // list of alias
+  // and
+  // alias -> processor class
+
+  // ******** WARNING ********
+  // If the ReduceProcessor is not there use Identity instead
+
+  private static HashMap<String, ReduceProcessor> processors = new HashMap<String, ReduceProcessor>(); // registry
+
+  private ReduceProcessorFactory() {
+  }
+
+  public static ReduceProcessor getProcessor(String reduceType)
+      throws UnknownReduceTypeException {
+    String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
+        + reduceType;
+    if (processors.containsKey(reduceType)) {
+      return processors.get(reduceType);
+    } else {
+      ReduceProcessor processor = null;
+      try {
+        processor = (ReduceProcessor) Class.forName(path).getConstructor()
+            .newInstance();
+      } catch (ClassNotFoundException e) {
+        // ******** WARNING ********
+        // If the ReduceProcessor is not there use Identity instead
+        processor = getProcessor("IdentityReducer");
+        register(reduceType, processor);
+        return processor;
+      } catch (Exception e) {
+        throw new UnknownReduceTypeException("error constructing processor", e);
+      }
+
+      // TODO using a ThreadSafe/reuse flag to actually decide if we want
+      // to reuse the same processor again and again
+      register(reduceType, processor);
+      return processor;
+    }
+  }
+
+  /**
+   * Register a specific parser for a {@link ReduceProcessor} implementation.
+   */
+  public static synchronized void register(String reduceType,
+      ReduceProcessor processor) {
+    log.info("register " + processor.getClass().getName()
+        + " for this recordType :" + reduceType);
+    if (processors.containsKey(reduceType)) {
+      throw new DuplicateReduceProcessorException(
+          "Duplicate processor for recordType:" + reduceType);
+    }
+    ReduceProcessorFactory.processors.put(reduceType, processor);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java Wed Mar 11 22:39:26 2009
@@ -18,121 +18,137 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
+
 import java.io.IOException;
 import java.util.Iterator;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class SystemMetrics  implements ReduceProcessor {
-	static Logger log = Logger.getLogger(SystemMetrics.class);
-	@Override
-	public String getDataType() {
-		return this.getClass().getName();
-	}
-
-	@Override
-	public void process(ChukwaRecordKey key, 
-						Iterator<ChukwaRecord> values,
-						OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-						Reporter reporter) {
-		try {
-			
-			ChukwaRecord record = null;
-			ChukwaRecord newRecord = new ChukwaRecord();
-      
-			while(values.hasNext()) {
-				record = values.next();
-				newRecord.setTime(record.getTime());
-
-				if(record.containsField("IFACE")) {
-					if(record.containsField("rxpck/s")) {
-						if (record.containsField("rxbyt/s") && record.containsField("txbyt/s")) {
-							double netBusyPcnt=0, netRxByts=0, netTxByts=0, netSpeed=128000000.00;
-							netRxByts=Double.parseDouble(record.getValue("rxbyt/s"));
-							netTxByts=Double.parseDouble(record.getValue("txbyt/s"));
-							netBusyPcnt=(netRxByts/netSpeed*100)+(netTxByts/netSpeed*100);
-							record.add(record.getValue("IFACE")+"_busy_pcnt", "" + netBusyPcnt);
-							record.add("csource", record.getValue("csource"));
-						}						
-						record.add(record.getValue("IFACE")+".rxbyt/s", record.getValue("rxbyt/s"));
-						record.add(record.getValue("IFACE")+".rxpck/s", record.getValue("rxpck/s"));
-						record.add(record.getValue("IFACE")+".txbyt/s", record.getValue("txbyt/s"));
-						record.add(record.getValue("IFACE")+".txpck/s", record.getValue("txpck/s"));
-						record.removeValue("rxbyt/s");
-						record.removeValue("rxpck/s");
-						record.removeValue("txbyt/s");
-						record.removeValue("txpck/s");
-					}
-					if(record.containsField("rxerr/s")) {
-						record.add(record.getValue("IFACE")+".rxerr/s", record.getValue("rxerr/s"));
-						record.add(record.getValue("IFACE")+".rxdrop/s", record.getValue("rxdrop/s"));
-						record.add(record.getValue("IFACE")+".txerr/s", record.getValue("txerr/s"));
-						record.add(record.getValue("IFACE")+".txdrop/s", record.getValue("txdrop/s"));
-						record.removeValue("rxerr/s");						
-						record.removeValue("rxdrop/s");
-						record.removeValue("txerr/s");
-						record.removeValue("txdrop/s");
-					}
-					record.removeValue("IFACE");
-				}
-				
-				if(record.containsField("Device:")) {
-					record.add(record.getValue("Device:")+".r/s", record.getValue("r/s"));
-					record.add(record.getValue("Device:")+".w/s", record.getValue("w/s"));
-					record.add(record.getValue("Device:")+".rkB/s", record.getValue("rkB/s"));
-					record.add(record.getValue("Device:")+".wkB/s", record.getValue("wkB/s"));
-					record.add(record.getValue("Device:")+".%util", record.getValue("%util"));
-					record.removeValue("r/s");
-					record.removeValue("w/s");
-					record.removeValue("rkB/s");
-					record.removeValue("wkB/s");
-					record.removeValue("%util");
-					record.removeValue("Device:");
-				}
-				
-				if (record.containsField("swap_free")) {
-					float swapUsedPcnt=0, swapUsed=0, swapTotal=0;
-					swapUsed=Long.parseLong(record.getValue("swap_used"));
-					swapTotal=Long.parseLong(record.getValue("swap_total"));
-					swapUsedPcnt=swapUsed/swapTotal*100;
-					record.add("swap_used_pcnt", "" + swapUsedPcnt);
-					record.add("csource", record.getValue("csource"));
-				}
-				
-				if (record.containsField("mem_used")) {
-					double memUsedPcnt=0, memTotal=0, memUsed=0;
-					memTotal=Double.parseDouble(record.getValue("mem_total"));
-					memUsed=Double.parseDouble(record.getValue("mem_used"));
-					memUsedPcnt=memUsed/memTotal*100;
-					record.add("mem_used_pcnt", "" + memUsedPcnt);
-					record.add("csource", record.getValue("csource"));
-				}
-				
-				if (record.containsField("mem_buffers")) {
-					double memBuffersPcnt=0, memTotal=0, memBuffers=0;
-					memTotal=Double.parseDouble(record.getValue("mem_total"));
-					memBuffers=Double.parseDouble(record.getValue("mem_buffers"));
-					memBuffersPcnt=memBuffers/memTotal*100;
-					record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
-					record.add("csource", record.getValue("csource"));
-				}
-						
-				// Copy over all fields
-				String[] fields = record.getFields();
-				for(String f: fields){
-				  newRecord.add(f, record.getValue(f));
-				}
-			}
-			record.add("capp", "systemMetrics");
-			output.collect(key, newRecord);   
-		} catch (IOException e) {
-			log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e);
-			e.printStackTrace();
-		}
+public class SystemMetrics implements ReduceProcessor {
+  static Logger log = Logger.getLogger(SystemMetrics.class);
+
+  @Override
+  public String getDataType() {
+    return this.getClass().getName();
+  }
+
+  @Override
+  public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    try {
+
+      ChukwaRecord record = null;
+      ChukwaRecord newRecord = new ChukwaRecord();
+
+      while (values.hasNext()) {
+        record = values.next();
+        newRecord.setTime(record.getTime());
+
+        if (record.containsField("IFACE")) {
+          if (record.containsField("rxpck/s")) {
+            if (record.containsField("rxbyt/s")
+                && record.containsField("txbyt/s")) {
+              double netBusyPcnt = 0, netRxByts = 0, netTxByts = 0, netSpeed = 128000000.00;
+              netRxByts = Double.parseDouble(record.getValue("rxbyt/s"));
+              netTxByts = Double.parseDouble(record.getValue("txbyt/s"));
+              netBusyPcnt = (netRxByts / netSpeed * 100)
+                  + (netTxByts / netSpeed * 100);
+              record.add(record.getValue("IFACE") + "_busy_pcnt", ""
+                  + netBusyPcnt);
+              record.add("csource", record.getValue("csource"));
+            }
+            record.add(record.getValue("IFACE") + ".rxbyt/s", record
+                .getValue("rxbyt/s"));
+            record.add(record.getValue("IFACE") + ".rxpck/s", record
+                .getValue("rxpck/s"));
+            record.add(record.getValue("IFACE") + ".txbyt/s", record
+                .getValue("txbyt/s"));
+            record.add(record.getValue("IFACE") + ".txpck/s", record
+                .getValue("txpck/s"));
+            record.removeValue("rxbyt/s");
+            record.removeValue("rxpck/s");
+            record.removeValue("txbyt/s");
+            record.removeValue("txpck/s");
+          }
+          if (record.containsField("rxerr/s")) {
+            record.add(record.getValue("IFACE") + ".rxerr/s", record
+                .getValue("rxerr/s"));
+            record.add(record.getValue("IFACE") + ".rxdrop/s", record
+                .getValue("rxdrop/s"));
+            record.add(record.getValue("IFACE") + ".txerr/s", record
+                .getValue("txerr/s"));
+            record.add(record.getValue("IFACE") + ".txdrop/s", record
+                .getValue("txdrop/s"));
+            record.removeValue("rxerr/s");
+            record.removeValue("rxdrop/s");
+            record.removeValue("txerr/s");
+            record.removeValue("txdrop/s");
+          }
+          record.removeValue("IFACE");
+        }
+
+        if (record.containsField("Device:")) {
+          record.add(record.getValue("Device:") + ".r/s", record
+              .getValue("r/s"));
+          record.add(record.getValue("Device:") + ".w/s", record
+              .getValue("w/s"));
+          record.add(record.getValue("Device:") + ".rkB/s", record
+              .getValue("rkB/s"));
+          record.add(record.getValue("Device:") + ".wkB/s", record
+              .getValue("wkB/s"));
+          record.add(record.getValue("Device:") + ".%util", record
+              .getValue("%util"));
+          record.removeValue("r/s");
+          record.removeValue("w/s");
+          record.removeValue("rkB/s");
+          record.removeValue("wkB/s");
+          record.removeValue("%util");
+          record.removeValue("Device:");
+        }
+
+        if (record.containsField("swap_free")) {
+          float swapUsedPcnt = 0, swapUsed = 0, swapTotal = 0;
+          swapUsed = Long.parseLong(record.getValue("swap_used"));
+          swapTotal = Long.parseLong(record.getValue("swap_total"));
+          swapUsedPcnt = swapUsed / swapTotal * 100;
+          record.add("swap_used_pcnt", "" + swapUsedPcnt);
+          record.add("csource", record.getValue("csource"));
+        }
+
+        if (record.containsField("mem_used")) {
+          double memUsedPcnt = 0, memTotal = 0, memUsed = 0;
+          memTotal = Double.parseDouble(record.getValue("mem_total"));
+          memUsed = Double.parseDouble(record.getValue("mem_used"));
+          memUsedPcnt = memUsed / memTotal * 100;
+          record.add("mem_used_pcnt", "" + memUsedPcnt);
+          record.add("csource", record.getValue("csource"));
+        }
+
+        if (record.containsField("mem_buffers")) {
+          double memBuffersPcnt = 0, memTotal = 0, memBuffers = 0;
+          memTotal = Double.parseDouble(record.getValue("mem_total"));
+          memBuffers = Double.parseDouble(record.getValue("mem_buffers"));
+          memBuffersPcnt = memBuffers / memTotal * 100;
+          record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
+          record.add("csource", record.getValue("csource"));
+        }
+
+        // Copy over all fields
+        String[] fields = record.getFields();
+        for (String f : fields) {
+          newRecord.add(f, record.getValue(f));
+        }
+      }
+      record.add("capp", "systemMetrics");
+      output.collect(key, newRecord);
+    } catch (IOException e) {
+      log.warn("Unable to collect output in SystemMetricsReduceProcessor ["
+          + key + "]", e);
+      e.printStackTrace();
+    }
 
-	}
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java Wed Mar 11 22:39:26 2009
@@ -18,31 +18,27 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 
-public class UnknownReduceTypeException extends Exception
-{
 
-	/**
+public class UnknownReduceTypeException extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 5760553864088487836L;
+  private static final long serialVersionUID = 5760553864088487836L;
+
+  public UnknownReduceTypeException() {
+  }
 
-	public UnknownReduceTypeException()
-	{
-	}
-
-	public UnknownReduceTypeException(String message)
-	{
-		super(message);
-	}
-
-	public UnknownReduceTypeException(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public UnknownReduceTypeException(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public UnknownReduceTypeException(String message) {
+    super(message);
+  }
+
+  public UnknownReduceTypeException(Throwable cause) {
+    super(cause);
+  }
+
+  public UnknownReduceTypeException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java Wed Mar 11 22:39:26 2009
@@ -18,102 +18,83 @@
 
 package org.apache.hadoop.chukwa.extraction.engine;
 
+
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.hadoop.record.Buffer;
 
-public class ChukwaRecord extends ChukwaRecordJT
-implements Record
-{
-	public ChukwaRecord()
-	{}
-	
-	
-	
-	public void add(String key, String value)
-	{
-		synchronized(this)
-		{
-			if (this.mapFields == null)
-			{
-				this.mapFields = new TreeMap<String,org.apache.hadoop.record.Buffer>();
-			}
-		}
-		this.mapFields.put(key, new Buffer(value.getBytes()));
-	}
-	
-	public String[] getFields()
-	{
-		return this.mapFields.keySet().toArray(new String[0]);
-	}
-
-	public String getValue(String field)
-	{
-		if (this.mapFields.containsKey(field))
-		{
-			return new String(this.mapFields.get(field).get());		
-		}
-		else
-		{
-			return null;
-		}
-	}
-
-	public boolean containsField(String field)
-	{
-		return this.mapFields.containsKey(field);
-	}
-
-	public void removeValue(String field) {
-		if(this.mapFields.containsKey(field)) {
-			this.mapFields.remove(field);
-		}
-	}
-	
-	@Override
-	public String toString()
-	{
-		Set <Map.Entry<String,Buffer>> f = this.mapFields.entrySet();
-		Iterator <Map.Entry<String,Buffer>> it = f.iterator();
-		
-		Map.Entry<String,Buffer> entry = null;
-		StringBuilder sb = new StringBuilder();
-		sb.append("<event  ");
-		StringBuilder body = new StringBuilder();
-		
-		String key = null;
-		String val = null;
-		boolean hasBody = false;
-		String bodyVal = null;
-		while (it.hasNext())
-		{
-			entry = it.next();
-			key = entry.getKey().intern();
-			val = new String(entry.getValue().get());
-			if (key.intern() == Record.bodyField.intern())
-			{
-				hasBody = true;
-				bodyVal = val;
-			}
-			else
-			{
-				sb.append(key).append("=\"").append(val).append("\" ");
-				body.append(key).append( " = ").append(val).append("<br>");
-			}
-			
-			
-		}
-		if (hasBody)	
-		{ sb.append(">").append(bodyVal);}
-		else
-		{ sb.append(">").append(body);}
-		sb.append("</event>");
-		
-		return sb.toString();
-	}
+public class ChukwaRecord extends ChukwaRecordJT implements Record {
+  public ChukwaRecord() {
+  }
+
+  public void add(String key, String value) {
+    synchronized (this) {
+      if (this.mapFields == null) {
+        this.mapFields = new TreeMap<String, org.apache.hadoop.record.Buffer>();
+      }
+    }
+    this.mapFields.put(key, new Buffer(value.getBytes()));
+  }
+
+  public String[] getFields() {
+    return this.mapFields.keySet().toArray(new String[0]);
+  }
+
+  public String getValue(String field) {
+    if (this.mapFields.containsKey(field)) {
+      return new String(this.mapFields.get(field).get());
+    } else {
+      return null;
+    }
+  }
+
+  public boolean containsField(String field) {
+    return this.mapFields.containsKey(field);
+  }
+
+  public void removeValue(String field) {
+    if (this.mapFields.containsKey(field)) {
+      this.mapFields.remove(field);
+    }
+  }
+
+  @Override
+  public String toString() {
+    Set<Map.Entry<String, Buffer>> f = this.mapFields.entrySet();
+    Iterator<Map.Entry<String, Buffer>> it = f.iterator();
+
+    Map.Entry<String, Buffer> entry = null;
+    StringBuilder sb = new StringBuilder();
+    sb.append("<event  ");
+    StringBuilder body = new StringBuilder();
+
+    String key = null;
+    String val = null;
+    boolean hasBody = false;
+    String bodyVal = null;
+    while (it.hasNext()) {
+      entry = it.next();
+      key = entry.getKey().intern();
+      val = new String(entry.getValue().get());
+      if (key.intern() == Record.bodyField.intern()) {
+        hasBody = true;
+        bodyVal = val;
+      } else {
+        sb.append(key).append("=\"").append(val).append("\" ");
+        body.append(key).append(" = ").append(val).append("<br>");
+      }
+
+    }
+    if (hasBody) {
+      sb.append(">").append(bodyVal);
+    } else {
+      sb.append(">").append(body);
+    }
+    sb.append("</event>");
+
+    return sb.toString();
+  }
 
-	
 }