You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/04/18 07:13:42 UTC

[2/4] chukwa git commit: CHUKWA-667. Optimize HBase metrics schema. (Eric Yang)

CHUKWA-667. Optimize HBase metrics schema.  (Eric Yang)


Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/7ae68398
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/7ae68398
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/7ae68398

Branch: refs/heads/master
Commit: 7ae68398626b33ac5fc06caf83573e992b766af0
Parents: fb022bf
Author: Eric Yang <ey...@apache.org>
Authored: Sun Apr 12 00:16:58 2015 -0700
Committer: Eric Yang <ey...@apache.org>
Committed: Sun Apr 12 00:16:58 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 conf/chukwa-demux-conf.xml                      |  11 +-
 conf/hbase.schema                               |  64 +--
 .../writer/hbase/HBaseWriter.java               | 141 +----
 .../writer/hbase/OutputCollector.java           |  73 ---
 .../datacollection/writer/hbase/Reporter.java   | 105 ++--
 .../chukwa/datastore/ChukwaHBaseStore.java      | 510 +++++++++----------
 .../demux/processor/mapper/DFInvalidRecord.java |  44 --
 .../extraction/demux/processor/mapper/Df.java   | 118 -----
 .../mapper/HadoopMetricsProcessor.java          |  17 +-
 .../demux/processor/mapper/Iostat.java          | 146 ------
 .../demux/processor/mapper/JobSummary.java      |   2 +-
 .../mapper/Log4JMetricsContextProcessor.java    |   2 +-
 .../demux/processor/mapper/PbsInvalidEntry.java |  44 --
 .../demux/processor/mapper/PbsNodes.java        | 198 -------
 .../extraction/demux/processor/mapper/Ps.java   | 144 ------
 .../extraction/demux/processor/mapper/Sar.java  | 172 -------
 .../extraction/demux/processor/mapper/Top.java  | 171 -------
 .../demux/processor/mapper/Torque.java          |  93 ----
 .../demux/processor/mapper/YWatch.java          | 121 -----
 .../processor/mapper/YwatchInvalidEntry.java    |  43 --
 .../chukwa/hicc/rest/HeatmapController.java     |  61 ++-
 .../chukwa/hicc/rest/MetricsController.java     | 112 +---
 .../inputtools/log4j/Log4JMetricsContext.java   |   2 +-
 src/main/web/hicc/jsp/graph_explorer.jsp        |  28 +-
 .../web/hicc/jsp/host_selector_dropdown.jsp     |   2 +-
 .../TestLog4JMetricsContextChukwaRecord.java    |  16 +-
 .../demux/processor/mapper/TestPsOutput.java    |  53 --
 28 files changed, 420 insertions(+), 2075 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 21b9f54..7acc0f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    CHUKWA-667. Optimize HBase metrics schema.  (Eric Yang)
+
     CHUKWA-741. Updated to Hadoop 2.6.0 and HBase 1.0.0.  (Eric Yang)
 
     CHUKWA-740. Updated README file to be more current.  (Lewis John McGibbney via Eric Yang)

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/conf/chukwa-demux-conf.xml
----------------------------------------------------------------------
diff --git a/conf/chukwa-demux-conf.xml b/conf/chukwa-demux-conf.xml
index 62aa04f..151bfa5 100644
--- a/conf/chukwa-demux-conf.xml
+++ b/conf/chukwa-demux-conf.xml
@@ -211,13 +211,13 @@
 
    <property>
     <name>ChukwaMetrics</name>
-    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaMetricsProcessor</value>
+    <value>org.apache.hadoop.chukwa.extraction.hbase.ChukwaMetricsProcessor</value>
     <description></description>
    </property>
 
    <property>
     <name>SystemMetrics</name>
-    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SystemMetrics</value>
+    <value>org.apache.hadoop.chukwa.extraction.hbase.SystemMetrics</value>
     <description></description>
    </property>
 
@@ -269,13 +269,6 @@
         <description> Reducer class for Reduce Type MRJobReduceProcessor </description>
     </property>
 
-    <!-- Demux configs for both mapper and reducer -->
-    <property>
-        <name>SystemMetrics</name>
-        <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SystemMetrics,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.SystemMetrics</value>
-        <description> Reducer class for Reduce Type SystemMetrics </description>
-    </property>
-
     <property>
         <name>ClientTrace</name>
         <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ClientTraceProcessor,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ClientTrace</value>

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/conf/hbase.schema
----------------------------------------------------------------------
diff --git a/conf/hbase.schema b/conf/hbase.schema
index 7015725..5e32f90 100644
--- a/conf/hbase.schema
+++ b/conf/hbase.schema
@@ -1,61 +1,5 @@
-create "Hadoop",
-{NAME => "ClientTrace", VERSIONS => 65535},
-{NAME => "dfs_namenode", VERSIONS => 65535},
-{NAME => "dfs_FSNamesystem", VERSIONS => 65535},
-{NAME => "dfs_datanode", VERSIONS => 65535},
-{NAME => "mapred_jobtracker", VERSIONS => 65535},
-{NAME => "mapred_shuffleOutput", VERSIONS => 65535},
-{NAME => "mapred_tasktracker", VERSIONS => 65535},
-{NAME => "jvm_metrics", VERSIONS => 65535},
-{NAME => "mapred_Queue", VERSIONS => 65535},
-{NAME => "metricssystem_MetricsSystem", VERSIONS => 65535},
-{NAME => "rpc_rpc", VERSIONS => 65535},
-{NAME => "rpcdetailed_rpcdetailed", VERSIONS => 65535},
-{NAME => "ugi_ugi", VERSIONS => 65535}
-create "HadoopLog", 
-{NAME => "NameNode", VERSIONS => 65535},
-{NAME => "Audit", VERSIONS => 65535}
-create "Jobs",
-{NAME => "summary" }
-create "SystemMetrics", 
-{NAME => "cpu", VERSIONS => 65535},
-{NAME => "system", VERSIONS => 65535},
-{NAME => "disk", VERSIONS => 65535},
-{NAME => "memory", VERSIONS => 65535},
-{NAME => "swap", VERSIONS => 65535},
-{NAME => "network", VERSIONS => 65535},
-{NAME => "tags", VERSIONS => 65535}
-create "ClusterSummary", 
-{NAME=> "cpu", VERSIONS => 65535},
-{NAME => "system", VERSIONS => 65535},
-{NAME => "disk", VERSIONS => 65535},
-{NAME => "memory", VERSIONS => 65535},
-{NAME => "network", VERSIONS => 65535},
-{NAME => "swap", VERSIONS => 65535},
-{NAME => "hdfs", VERSIONS => 65535},
-{NAME => "mapreduce", VERSIONS => 65535}
+create "chukwa_meta",
+{NAME=>"k"}
 create "chukwa", 
-{NAME=>"chukwaAgent_chunkQueue", VERSIONS => 65535},
-{NAME => "chukwaAgent_metrics", VERSIONS => 65535},
-{NAME => "chukwaAgent_httpSender", VERSIONS => 65535}
-create "HBase",
-{NAME => "master", VERSIONS => 65535},
-{NAME => "regionserver", VERSIONS => 65535}
-create "Namenode",
-{NAME => "summary", VERSIONS => 65535},
-{NAME => "hdfs", VERSIONS => 65535},
-{NAME => "rpc", VERSIONS => 65535},
-{NAME => "jvm", VERSIONS => 65535}
-create "Zookeeper",
-{NAME => "zk", VERSIONS => 65535}
-create "JobTracker",
-{NAME => "jt", VERSIONS => 65535},
-{NAME => "jvm", VERSIONS => 65535},
-{NAME => "rpc", VERSIONS => 65535}
-create "Datanode",
-{NAME => "dn", VERSIONS => 65535},
-{NAME => "jvm", VERSIONS => 65535},
-{NAME => "rpc", VERSIONS => 65535}
-
-
-
+{NAME=>"t"},
+{NAME=>"a"}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
index 8344318..34c82e1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
@@ -19,49 +19,42 @@
 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
 
 import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
-import org.apache.hadoop.chukwa.extraction.demux.Demux;
-import org.apache.hadoop.chukwa.util.ClassUtils;
-import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor;
+import org.apache.hadoop.chukwa.extraction.hbase.ProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.hbase.UnknownRecordTypeException;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.log4j.Logger;
 
 public class HBaseWriter extends PipelineableWriter {
   static Logger log = Logger.getLogger(HBaseWriter.class);
+  private static final String CHUKWA_TABLE = "chukwa";
+  private static final String CHUKWA_META_TABLE = "chukwa_meta";
   boolean reportStats;
   volatile long dataSize = 0;
   final Timer statTimer;
-  private OutputCollector output;
+  private ArrayList<Put> output;
   private Reporter reporter;
   private ChukwaConfiguration conf;
   String defaultProcessor;
   private HConnection connection;
-  private Configuration hconf;
   
   private class StatReportingTask extends TimerTask {
     private long lastTs = System.currentTimeMillis();
@@ -98,20 +91,19 @@ public class HBaseWriter extends PipelineableWriter {
   private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
     this.reportStats = reportStats;
     this.conf = conf;
-    this.hconf = hconf;
     this.statTimer = new Timer();
     this.defaultProcessor = conf.get(
       "chukwa.demux.mapper.default.processor",
       "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
-    Demux.jobConf = conf;
     log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
     if (reportStats) {
       statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
     }
-    output = new OutputCollector();
-    reporter = new Reporter();
-    if(conf.getBoolean("hbase.writer.verify.schema", false)) {
-      verifyHbaseSchema();      
+    output = new ArrayList<Put>();
+    try {
+      reporter = new Reporter();
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException("Can not register hashing algorithm.");
     }
     connection = HConnectionManager.createConnection(hconf);
   }
@@ -125,83 +117,21 @@ public class HBaseWriter extends PipelineableWriter {
   public void init(Configuration conf) throws WriterException {
   }
 
-  private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
-    boolean status = false;
-    try {
-      if(admin.tableExists(table.name())) {
-        HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
-        HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
-        for(HColumnDescriptor cd : columnDescriptors) {
-          if(cd.getNameAsString().equals(table.columnFamily())) {
-            log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
-            status = true;
-          }
-        }
-      } else {
-        throw new Exception("HBase table: "+table.name()+ " does not exist.");
-      }
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
-      status = false;
-    }
-    return status;    
-  }
-  
-  private void verifyHbaseSchema() {
-    log.debug("Verify Demux parser with HBase schema");
-    boolean schemaVerified = true;
-    try {
-      HBaseAdmin admin = new HBaseAdmin(hconf);
-      List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
-      for(Class<?> x : demuxParsers) {
-        if(x.isAnnotationPresent(Tables.class)) {
-          Tables list = x.getAnnotation(Tables.class);
-          for(Table table : list.annotations()) {
-            if(!verifyHbaseTable(admin, table)) {
-              schemaVerified = false;
-              log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");              
-            }
-          }
-        } else if(x.isAnnotationPresent(Table.class)) {
-          Table table = x.getAnnotation(Table.class);
-          if(!verifyHbaseTable(admin, table)) {
-            schemaVerified = false;
-            log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
-          }
-        }
-      }
-    } catch (Exception e) {
-      schemaVerified = false;
-      log.error(ExceptionUtil.getStackTrace(e));
-    }
-    if(!schemaVerified) {
-      log.error("Hbase schema mismatch with demux parser.");
-      if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
-        log.error("Exiting...");
-        DaemonWatcher.bailout(-1);
-      }
-    }
-  }
-
   @Override
   public CommitStatus add(List<Chunk> chunks) throws WriterException {
     CommitStatus rv = ChukwaWriter.COMMIT_OK;
     try {
+      HTableInterface hbase = connection.getTable(CHUKWA_TABLE);              
+      HTableInterface meta = connection.getTable(CHUKWA_META_TABLE);              
       for(Chunk chunk : chunks) {
         synchronized (this) {
           try {
-            Table table = findHBaseTable(chunk.getDataType());
-
-            if(table!=null) {
-              HTableInterface hbase = connection.getTable(table.name());              
-              MapProcessor processor = getProcessor(chunk.getDataType());
-              processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
-              hbase.put(output.getKeyValues());
-            } else {
-              log.warn("Error finding HBase table for data type:"+chunk.getDataType());
-            }
-          } catch (Exception e) {
-            log.warn(output.getKeyValues());
+            AbstractProcessor processor = getProcessor(chunk.getDataType());
+            processor.process(chunk, output, reporter);
+            hbase.put(output);
+            meta.put(reporter.getInfo());
+          } catch (Throwable e) {
+            log.warn(output);
             log.warn(ExceptionUtil.getStackTrace(e));
           }
           dataSize += chunk.getData().length;
@@ -209,6 +139,7 @@ public class HBaseWriter extends PipelineableWriter {
           reporter.clear();
         }
       }
+      hbase.close();
     } catch (Exception e) {
       log.error(ExceptionUtil.getStackTrace(e));
       throw new WriterException("Failed to store data to HBase.");
@@ -219,31 +150,9 @@ public class HBaseWriter extends PipelineableWriter {
     return rv;
   }
 
-  public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
-    MapProcessor processor = getProcessor(dataType);
-
-    Table table = null;
-    if(processor.getClass().isAnnotationPresent(Table.class)) {
-      return processor.getClass().getAnnotation(Table.class);
-    } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
-      Tables tables = processor.getClass().getAnnotation(Tables.class);
-      for(Table t : tables.annotations()) {
-        table = t;
-      }
-    }
-
-    return table;
-  }
-
-  public String findHBaseColumnFamilyName(String dataType)
-          throws UnknownRecordTypeException {
-    Table table = findHBaseTable(dataType);
-    return table.columnFamily();
-  }
-
-  private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
+  private AbstractProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
     String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
-    return MapProcessorFactory.getProcessor(processorClass);
+    return ProcessorFactory.getProcessor(processorClass);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
deleted file mode 100644
index 352ca14..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.datacollection.writer.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.hbase.client.Put;
-
-public class OutputCollector implements
-    org.apache.hadoop.mapred.OutputCollector<ChukwaRecordKey, ChukwaRecord> {
-  
-  private List<Put> buffers;
-  private StringBuffer s = new StringBuffer();
-  private byte[] rowKey = null;
-  private byte[] cf = null;
-  private long now = 0L;
-
-  public OutputCollector() {
-    buffers = new ArrayList<Put>();
-  }
-  
-  @Override
-  public void collect(ChukwaRecordKey key, ChukwaRecord value) throws IOException {
-    String[] keyParts = key.getKey().split("/");
-    s.setLength(0);
-    s.append(keyParts[2]);
-    s.append("-");
-    s.append(keyParts[1]);
-    
-    rowKey = s.toString().getBytes();
-
-    cf = key.getReduceType().getBytes();
-    now = value.getTime();
-
-    Put kv = new Put(rowKey);
-    for(String field : value.getFields()) {
-        kv.add(cf, field.getBytes(), now , value.getValue(field).getBytes());
-    }  
-    buffers.add(kv);  
-  }
-
-  public List<Put> getKeyValues() {
-    return buffers;
-  }
-
-  public void clear() {
-    s.setLength(0);
-    rowKey = null;
-    cf = null;
-    buffers.clear();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
index ab1ce82..8b1674d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
@@ -18,62 +18,91 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
 
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Counters.Counter;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
 
-public class Reporter implements org.apache.hadoop.mapred.Reporter {
+import org.apache.hadoop.hbase.client.Put;
+import org.json.simple.JSONObject;
+import org.mortbay.log.Log;
 
-  @Override
-  public Counter getCounter(Enum<?> arg0) {
-    // TODO Auto-generated method stub
-    return null;
+public class Reporter {
+  private ArrayList<Put> meta = new ArrayList<Put>();
+  private MessageDigest md5 = null;
+
+  public Reporter() throws NoSuchAlgorithmException {
+    md5 = MessageDigest.getInstance("md5");
   }
 
-  @Override
-  public Counter getCounter(String arg0, String arg1) {
-    // TODO Auto-generated method stub
-    return null;
+  public void putSource(String type, String source) {
+    byte[] value = getHash(source);
+    JSONObject json = new JSONObject();
+
+    try {
+      json.put("sig", new String(value, "UTF-8"));
+      json.put("type", "source");
+    } catch (UnsupportedEncodingException e) {
+      Log.warn("Error encoding metadata.");
+      Log.warn(e);
+    }
+    put(type.getBytes(), source.getBytes(), json.toString().getBytes());
+
   }
 
-  @Override
-  public InputSplit getInputSplit() throws UnsupportedOperationException {
-    // TODO Auto-generated method stub
-    return null;
+  public void putMetric(String type, String metric) {
+    String buf = new StringBuilder(type).append(".").append(metric).toString();
+    byte[] pk = getHash(buf);
+
+    JSONObject json = new JSONObject();
+    try {
+      json.put("sig", new String(pk, "UTF-8"));
+      json.put("type", "metric");
+    } catch (UnsupportedEncodingException e) {
+      Log.warn("Error encoding metadata.");
+      Log.warn(e);
+    }
+    put(type.getBytes(), metric.getBytes(), json.toString().getBytes());
+
   }
 
-  @Override
-  public void incrCounter(Enum<?> arg0, long arg1) {
-    // TODO Auto-generated method stub
-    
+  public void put(String key, String source, String info) {
+    put(key.getBytes(), source.getBytes(), info.getBytes());
   }
 
-  @Override
-  public void incrCounter(String arg0, String arg1, long arg2) {
-    // TODO Auto-generated method stub
-    
+  public void put(byte[] key, byte[] source, byte[] info) {
+    Put put = new Put(key);
+    put.add("k".getBytes(), source, info);
+    meta.add(put);
   }
 
-  @Override
-  public void setStatus(String arg0) {
-    // TODO Auto-generated method stub
-    
+  public void clear() {
+    meta.clear();
   }
 
-  @Override
-  public void progress() {
-    // TODO Auto-generated method stub
-    
+  public List<Put> getInfo() {
+    return meta;
   }
 
-  @Override
-  public float getProgress() {
-    // TODO Auto-generated method stub
-    return 0.0f;
+  private byte[] getHash(String key) {
+    byte[] hash = new byte[3];
+    System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 3);
+    return hash;
   }
 
-  public void clear() {
-    // TODO Auto-generated method stub
-    
+  public void putClusterName(String type, String clusterName) {
+    byte[] value = getHash(clusterName);
+    JSONObject json = new JSONObject();
+
+    try {
+      json.put("sig", new String(value, "UTF-8"));
+      json.put("type", "cluster");
+    } catch (UnsupportedEncodingException e) {
+      Log.warn("Error encoding metadata.");
+      Log.warn(e);
+    }
+    put(type.getBytes(), clusterName.getBytes(), json.toString().getBytes());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 82a5e57..d9c32d6 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -18,15 +18,18 @@
 package org.apache.hadoop.chukwa.datastore;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -35,330 +38,293 @@ import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint;
 import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
 import org.apache.hadoop.chukwa.hicc.bean.Series;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.HBaseUtil;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 
 public class ChukwaHBaseStore {
   private static Configuration hconf = HBaseConfiguration.create();
-  private static HConnection connection = null;
-  private static final int POOL_SIZE = 60;
-  static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
-  
-  public static Series getSeries(String tableName, String rkey, String family, String column,
-      long startTime, long endTime, boolean filterByRowKey) {
-    StringBuilder seriesName = new StringBuilder();
-    seriesName.append(rkey);
-    seriesName.append(":");
-    seriesName.append(family);
-    seriesName.append(":");
-    seriesName.append(column);
+  static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class);
+  static byte[] COLUMN_FAMILY = "t".getBytes();
+  static byte[] ANNOTATION_FAMILY = "a".getBytes();
+  static byte[] KEY_NAMES = "k".getBytes();
+  private static final String CHUKWA = "chukwa";
+  private static final String CHUKWA_META = "chukwa_meta";
+  private static long MILLISECONDS_IN_DAY = 86400000L;
 
-    Series series = new Series(seriesName.toString());
+  /**
+   * Scan chukwa table for a particular metric group and metric name based on
+   * time ranges.
+   * 
+   * @param metricGroup
+   * @param metric
+   * @param source
+   * @param startTime
+   * @param endTime
+   * @return
+   */
+  public static Series getSeries(String metricGroup, String metric,
+      String source, long startTime, long endTime) {
+    String fullMetricName = new StringBuilder(metricGroup).append(".")
+        .append(metric).toString();
+    return getSeries(fullMetricName, source, startTime, endTime);
+  }
+
+  /**
+   * Scan chukwa table for a full metric name based on time ranges.
+   * 
+   * @param metric
+   * @param source
+   * @param startTime
+   * @param endTime
+   * @return
+   */
+  public static Series getSeries(String metric, String source, long startTime,
+      long endTime) {
+    String seriesName = new StringBuilder(metric).append(":").append(source).toString();
+    Series series = new Series(seriesName);
     try {
-      HTableInterface table = getHTableConnection().getTable(tableName);
-      Calendar c = Calendar.getInstance();
-      c.setTimeInMillis(startTime);
-      c.set(Calendar.MINUTE, 0);
-      c.set(Calendar.SECOND, 0);
-      c.set(Calendar.MILLISECOND, 0);
-      String startRow = c.getTimeInMillis()+rkey;
-      Scan scan = new Scan();
-      scan.addColumn(family.getBytes(), column.getBytes());
-      scan.setStartRow(startRow.getBytes());
-      scan.setTimeRange(startTime, endTime);
-      scan.setMaxVersions();
-      if(filterByRowKey) {
-        RowFilter rf = new RowFilter(CompareOp.EQUAL, new 
-            RegexStringComparator("[0-9]+-"+rkey+"$")); 
-        scan.setFilter(rf);
+      // Swap start and end if the values are inverted.
+      if (startTime > endTime) {
+        long temp = endTime;
+        startTime = endTime;
+        endTime = temp;
       }
-      ResultScanner results = table.getScanner(scan);
-      Iterator<Result> it = results.iterator();
-      // TODO: Apply discrete wavelet transformation to limit the output
-      // size to 1000 data points for graphing optimization. (i.e jwave)
-      while(it.hasNext()) {
-        Result result = it.next();
-        String temp = new String(result.getValue(family.getBytes(), column.getBytes()));
-        double value = Double.parseDouble(temp);
-        // TODO: Pig Store function does not honor HBase timestamp, hence need to parse rowKey for timestamp.
-        String buf = new String(result.getRow());
-        Long timestamp = Long.parseLong(buf.split("-")[0]);
-        // If Pig Store function can honor HBase timestamp, use the following line is better.
-        // series.add(result.getCellValue().getTimestamp(), value);
-        series.add(timestamp, value);
+      Connection connection = ConnectionFactory.createConnection(hconf);
+      Table table = connection.getTable(TableName.valueOf(CHUKWA));
+      Scan scan = new Scan();
+      Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+      c.setTimeInMillis(startTime);
+      int startDay = c.get(Calendar.DAY_OF_YEAR);
+      c.setTimeInMillis(endTime);
+      int endDay = c.get(Calendar.DAY_OF_YEAR);
+      long currentDay = startTime;
+      for (int i = startDay; i <= endDay; i++) {
+        byte[] rowKey = HBaseUtil.buildKey(currentDay, metric, source);
+        // ColumnRangeFilter crf = new
+        // ColumnRangeFilter(Long.valueOf(startTime).toString().getBytes(),
+        // true, Long.valueOf(endTime).toString().getBytes(), true);
+        // scan.setFilter(crf);
+        scan.addFamily(COLUMN_FAMILY);
+        scan.setStartRow(rowKey);
+        scan.setStopRow(rowKey);
+        scan.setTimeRange(startTime, endTime);
+        scan.setBatch(10000);
+
+        ResultScanner results = table.getScanner(scan);
+        Iterator<Result> it = results.iterator();
+        // TODO: Apply discrete wavelet transformation to limit the output
+        // size to 1000 data points for graphing optimization. (i.e jwave)
+        while (it.hasNext()) {
+          Result result = it.next();
+          for (KeyValue kv : result.raw()) {
+            byte[] key = kv.getQualifier();
+            long timestamp = ByteBuffer.wrap(key).getLong();
+            double value = Double
+                .parseDouble(new String(kv.getValue(), "UTF-8"));
+            series.add(timestamp, value);
+          }
+        }
+        results.close();
+        currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
       }
-      results.close();
       table.close();
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
+    } catch (Exception e) {
+      LOG.error(ExceptionUtil.getStackTrace(e));
     }
     return series;
   }
 
-  public static Set<String> getFamilyNames(String tableName) {
+  public static Set<String> getMetricNames(String metricGroup) {
     Set<String> familyNames = new CopyOnWriteArraySet<String>();
     try {
-      HTableInterface table = getHTableConnection().getTable(tableName);
-      Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
-      for(byte[] name : families) {
-        familyNames.add(new String(name));
+      Connection connection = ConnectionFactory.createConnection(hconf);
+      Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+      Get get = new Get(metricGroup.getBytes());
+      Result result = table.get(get);
+      for (KeyValue kv : result.raw()) {
+        JSONObject json = (JSONObject) JSONValue.parse(new String(kv.getValue(), "UTF-8"));
+        if (json.get("type").equals("metric")) {
+          familyNames.add(new String(kv.getQualifier(), "UTF-8"));
+        }
       }
       table.close();
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
+      connection.close();
+    } catch (Exception e) {
+      LOG.error(ExceptionUtil.getStackTrace(e));
     }
     return familyNames;
-    
+
   }
-  
-  public static Set<String> getTableNames() {
-    Set<String> tableNames = new CopyOnWriteArraySet<String>();
+
+  public static Set<String> getMetricGroups() {
+    Set<String> metricGroups = new CopyOnWriteArraySet<String>();
     try {
-      HBaseAdmin admin = new HBaseAdmin(hconf);
-      HTableDescriptor[] td = admin.listTables();
-      for(HTableDescriptor table : td) {
-        tableNames.add(new String(table.getName()));
+      Connection connection = ConnectionFactory.createConnection(hconf);
+      Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+      Scan scan = new Scan();
+      scan.addFamily(KEY_NAMES);
+      ResultScanner rs = table.getScanner(scan);
+      Iterator<Result> it = rs.iterator();
+      while (it.hasNext()) {
+        Result result = it.next();
+        metricGroups.add(new String(result.getRow(), "UTF-8"));
       }
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
+      table.close();
+      connection.close();
+    } catch (Exception e) {
+      LOG.error(ExceptionUtil.getStackTrace(e));
     }
-    return tableNames;
+    return metricGroups;
   }
 
-  public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
-    Result result = it.next();
-    if(result!=null) {
-      List<Cell> cList = result.listCells();
-      for(Cell cell : cList) {
-        columnNames.add(new String(CellUtil.cloneQualifier(cell)));
-      }
-    }
-  }
-  
-  public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
-    Set<String> columnNames = new CopyOnWriteArraySet<String>();
+  public static Set<String> getSourceNames(String dataType) {
+    Set<String> pk = new HashSet<String>();
     try {
-      HTableInterface table = getHTableConnection().getTable(tableName);
+      Connection connection = ConnectionFactory.createConnection(hconf);
+      Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Scan scan = new Scan();
-      if(!fullScan) {
-        // Take sample columns of the recent time.
-        StringBuilder temp = new StringBuilder();
-        temp.append(endTime-300000L);
-        scan.setStartRow(temp.toString().getBytes());
-        temp.setLength(0);
-        temp.append(endTime);
-        scan.setStopRow(temp.toString().getBytes());
-      } else {
-        StringBuilder temp = new StringBuilder();
-        temp.append(startTime);
-        scan.setStartRow(temp.toString().getBytes());
-        temp.setLength(0);
-        temp.append(endTime);
-        scan.setStopRow(temp.toString().getBytes());
-      }
-      scan.addFamily(family.getBytes());
-      ResultScanner results = table.getScanner(scan);
-      Iterator<Result> it = results.iterator();
-      if(fullScan) {
-        while(it.hasNext()) {
-          getColumnNamesHelper(columnNames, it);
-        }        
-      } else {
-        getColumnNamesHelper(columnNames, it);        
+      scan.addFamily(KEY_NAMES);
+      ResultScanner rs = table.getScanner(scan);
+      Iterator<Result> it = rs.iterator();
+      while (it.hasNext()) {
+        Result result = it.next();
+        for (Cell cell : result.rawCells()) {
+          JSONObject json = (JSONObject) JSONValue.parse(new String(cell.getValue(), "UTF-8"));
+          if (json.get("type").equals("source")) {
+            pk.add(new String(cell.getQualifier(), "UTF-8"));
+          }
+        }
       }
-      results.close();
       table.close();
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
+      connection.close();
+    } catch (Exception e) {
+      LOG.error(ExceptionUtil.getStackTrace(e));
     }
-    return columnNames;
+    return pk;
   }
-  
-  public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
-    Set<String> rows = new HashSet<String>();
+
+  public static Heatmap getHeatmap(String metricGroup, String metric,
+      long startTime, long endTime, double max, double scale, int height) {
+    final long MINUTE = TimeUnit.MINUTES.toMillis(1);
+    Heatmap heatmap = new Heatmap();
+    Set<String> sources = getSourceNames(metricGroup);
+    Set<String> metrics = getMetricNames(metricGroup);
+    List<Get> series = new ArrayList<Get>();
+    String fullName = new StringBuilder(metricGroup).append(".").append(metric).toString();
     try {
-      HTableInterface table = getHTableConnection().getTable(tableName);
-      Scan scan = new Scan();
-      scan.addColumn(family.getBytes(), qualifier.getBytes());
-      if(!fullScan) {
-        // Take sample columns of the recent time.
-        StringBuilder temp = new StringBuilder();
-        temp.append(endTime-300000L);
-        scan.setStartRow(temp.toString().getBytes());
-        temp.setLength(0);
-        temp.append(endTime);
-        scan.setStopRow(temp.toString().getBytes());
-      } else {
-        StringBuilder temp = new StringBuilder();
-        temp.append(startTime);
-        scan.setStartRow(temp.toString().getBytes());
-        temp.setLength(0);
-        temp.append(endTime);
-        scan.setStopRow(temp.toString().getBytes());
+      Connection connection = ConnectionFactory.createConnection(hconf);
+      Table table = connection.getTable(TableName.valueOf(CHUKWA));
+      Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+      c.setTimeInMillis(startTime);
+      int startDay = c.get(Calendar.DAY_OF_YEAR);
+      c.setTimeInMillis(endTime);
+      int endDay = c.get(Calendar.DAY_OF_YEAR);
+      long currentDay = startTime;
+      for (int i = startDay; i <= endDay; i++) {
+        for (String m : metrics) {
+          if (m.startsWith(fullName)) {
+            for (String source : sources) {
+              byte[] rowKey = HBaseUtil.buildKey(currentDay, m, source);
+              Get serie = new Get(rowKey);
+              serie.addFamily(COLUMN_FAMILY);
+              serie.setTimeRange(startTime, endTime);
+              series.add(serie);
+            }
+          }
+        }
+        currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
       }
-      ResultScanner results = table.getScanner(scan);
-      Iterator<Result> it = results.iterator();
-      while(it.hasNext()) {
-        Result result = it.next();
-        String buffer = new String(result.getRow());
-        String[] parts = buffer.split("-", 2);
-        if(!rows.contains(parts[1])) {
-          rows.add(parts[1]);
-        }    
+      Result[] rs = table.get(series);
+      int index = 0;
+      // Series display in y axis
+      int y = 0;
+      HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
+      for (Result result : rs) {
+        for(Cell cell : result.rawCells()) {
+          byte[] dest = new byte[5];
+          System.arraycopy(cell.getRow(), 3, dest, 0, 5);
+          String source = new String(dest);
+          long time = cell.getTimestamp();
+          // Time display in x axis
+          int x = (int) ((time - startTime) / MINUTE);
+          if (keyMap.containsKey(source)) {
+            y = keyMap.get(source);
+          } else {
+            keyMap.put(source, new Integer(index));
+            y = index;
+            index++;
+          }
+          double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
+          heatmap.put(x, y, v);
+          if (v > max) {
+            max = v;
+          }
+        }
       }
-      results.close();
       table.close();
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
+      int radius = height / index;
+      // Usually scale max from 0 to 100 for visualization
+      heatmap.putMax(scale);
+      for (HeatMapPoint point : heatmap.getHeatmap()) {
+        double round = point.count / max * scale;
+        round = Math.round(round * 100.0) / 100.0;
+        point.put(point.x, point.y * radius, round);
+      }
+      heatmap.putRadius(radius);
+      heatmap.putSeries(index);
+    } catch (IOException e) {
+      LOG.error(ExceptionUtil.getStackTrace(e));
     }
-    return rows;    
+    return heatmap;
   }
-  
-  public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) {
-    return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan);
-  }
-  
+
+  /**
+   * Scan chukwa table and find cluster tag from annotation column family from a
+   * range of entries.
+   * 
+   * @param startTime
+   * @param endTime
+   * @return
+   */
   public static Set<String> getClusterNames(long startTime, long endTime) {
-    String tableName = "SystemMetrics";
-    String family = "system";
-    String column = "ctags";
     Set<String> clusters = new HashSet<String>();
-    Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
     try {
-      HTableInterface table = getHTableConnection().getTable(tableName);
+      Connection connection = ConnectionFactory.createConnection(hconf);
+      Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Scan scan = new Scan();
-      scan.addColumn(family.getBytes(), column.getBytes());
-      scan.setTimeRange(startTime, endTime);
-      ResultScanner results = table.getScanner(scan);
-      Iterator<Result> it = results.iterator();
-      while(it.hasNext()) {
+      scan.addFamily(KEY_NAMES);
+      ResultScanner rs = table.getScanner(scan);
+      Iterator<Result> it = rs.iterator();
+      while (it.hasNext()) {
         Result result = it.next();
-        String buffer = new String(result.getValue(family.getBytes(), column.getBytes()));
-        Matcher m = p.matcher(buffer);
-        if(m.matches()) {
-          clusters.add(m.group(1));
+        for (Cell cell : result.rawCells()) {
+          JSONObject json = (JSONObject) JSONValue.parse(new String(cell.getValue(), "UTF-8"));
+          if (json.get("type").equals("cluster")) {
+            clusters.add(new String(cell.getQualifier(), "UTF-8"));
+          }
         }
       }
-      results.close();
       table.close();
-    } catch(Exception e) {
-      log.error(ExceptionUtil.getStackTrace(e));
+      connection.close();
+    } catch (Exception e) {
+      LOG.error(ExceptionUtil.getStackTrace(e));
     }
     return clusters;
   }
-  
-  public static Heatmap getHeatmap(String tableName, String family, String column, 
-		  long startTime, long endTime, double max, double scale, int height) {
-	final long MINUTE = TimeUnit.MINUTES.toMillis(1);
-	Heatmap heatmap = new Heatmap();
-    
-    try {
-        HTableInterface table = getHTableConnection().getTable(tableName);
-        Scan scan = new Scan();
-        ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes());
-        scan.addFamily(family.getBytes());
-        scan.setFilter(cpf);
-    	scan.setTimeRange(startTime, endTime);
-    	scan.setBatch(10000);
-        ResultScanner results = table.getScanner(scan);
-	    Iterator<Result> it = results.iterator();
-		int index = 0;
-		// Series display in y axis
-		int y = 0;
-		HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
-	    while(it.hasNext()) {
-		  Result result = it.next();
-		  List<Cell> cList = result.listCells();
-	      for(Cell cell : cList) {
-			String key = parseRowKey(result.getRow());
-			StringBuilder tmp = new StringBuilder();
-			tmp.append(key);
-			tmp.append(":");
-			tmp.append(new String(CellUtil.cloneQualifier(cell)));
-			String seriesName = tmp.toString();
-			long time = parseTime(result.getRow());
-			// Time display in x axis
-			int x = (int) ((time - startTime) / MINUTE);
-			if(keyMap.containsKey(seriesName)) {
-			  y = keyMap.get(seriesName);
-		    } else {
-			  keyMap.put(seriesName, new Integer(index));
-		      y = index;
-		      index++;
-			}
-			double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
-			heatmap.put(x, y, v);
-			if(v > max) {
-				max = v;
-			}
-	      }
-	    }
-	    results.close();
-	    table.close();
-	    int radius = height / index;
-	    // Usually scale max from 0 to 100 for visualization
-	    heatmap.putMax(scale);
-	    for(HeatMapPoint point : heatmap.getHeatmap()) {
-	      double round = point.count / max * scale;
-	      round = Math.round(round * 100.0) / 100.0;
-	      point.put(point.x, point.y * radius, round);
-	    }
-	    heatmap.putRadius(radius);
-	    heatmap.putSeries(index);
-	} catch (IOException e) {
-	    log.error(ExceptionUtil.getStackTrace(e));
-	}
-	return heatmap;
-  }
-  
-  private static String parseRowKey(byte[] row) {
-	  String key = new String(row);
-	  String[] parts = key.split("-", 2);
-	  return parts[1];
-  }
-
-  private static long parseTime(byte[] row) {
-	  String key = new String(row);
-	  String[] parts = key.split("-", 2);
-	  long time = Long.parseLong(parts[0]);
-	  return time;
-  }
-  
-  private static HConnection getHTableConnection() {
-    if(connection == null) {
-      synchronized(ChukwaHBaseStore.class) {
-        try {
-          ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
-          /* Set the hbase client properties to unblock immediately in case
-           * hbase goes down. This will ensure we timeout on socket connection to
-           * hbase early.
-           */
-          hconf.setInt("hbase.client.operation.timeout", 60000);
-          hconf.setLong("hbase.client.pause", 1000);
-          hconf.setInt("hbase.client.retries.number", 1);
-          connection = HConnectionManager.createConnection(hconf, pool);
-        }catch(IOException e) {
-          log.error("Unable to obtain connection to HBase " + e.getMessage());
-          e.printStackTrace();
-        }
-      }
-    }
-    return connection;
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
deleted file mode 100644
index 6b60b3a..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-public class DFInvalidRecord extends Exception {
-
-  /**
-	 * 
-	 */
-  private static final long serialVersionUID = 1254238125122522523L;
-
-  public DFInvalidRecord() {
-  }
-
-  public DFInvalidRecord(String arg0) {
-    super(arg0);
-  }
-
-  public DFInvalidRecord(Throwable arg0) {
-    super(arg0);
-  }
-
-  public DFInvalidRecord(String arg0, Throwable arg1) {
-    super(arg0, arg1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
deleted file mode 100644
index ef2bddd..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="Disk")
-public class Df extends AbstractProcessor {
-  static Logger log = Logger.getLogger(Df.class);
-  
-  private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
-      "Used", "Available", "Use%", "Mounted", "on" };
-  private static final String[] headerCols = { "Filesystem", "1K-blocks",
-      "Used", "Available", "Use%", "Mounted on" };
-  private SimpleDateFormat sdf = null;
-
-  public Df() {
-    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  }
-
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-      throws Throwable {
-
-    try {
-      String dStr = recordEntry.substring(0, 23);
-      int start = 24;
-      int idx = recordEntry.indexOf(' ', start);
-      // String level = recordEntry.substring(start, idx);
-      start = idx + 1;
-      idx = recordEntry.indexOf(' ', start);
-      // String className = recordEntry.substring(start, idx-1);
-      String body = recordEntry.substring(idx + 1);
-
-      Date d = sdf.parse(dStr);
-      String[] lines = body.split("\n");
-
-      String[] outputCols = lines[0].substring(lines[0].indexOf("Filesystem")).split("[\\s]++");
-
-      if (outputCols.length != headerSplitCols.length
-          || outputCols[0].intern() != headerSplitCols[0].intern()
-          || outputCols[1].intern() != headerSplitCols[1].intern()
-          || outputCols[2].intern() != headerSplitCols[2].intern()
-          || outputCols[3].intern() != headerSplitCols[3].intern()
-          || outputCols[4].intern() != headerSplitCols[4].intern()
-          || outputCols[5].intern() != headerSplitCols[5].intern()
-          || outputCols[6].intern() != headerSplitCols[6].intern()) {
-        throw new DFInvalidRecord("Wrong output format (header) ["
-            + recordEntry + "]");
-      }
-
-      String[] values = null;
-
-      // Data
-      ChukwaRecord record = null;
-
-      for (int i = 1; i < lines.length; i++) {
-        values = lines[i].split("[\\s]++");
-        key = new ChukwaRecordKey();
-        record = new ChukwaRecord();
-        this.buildGenericRecord(record, null, d.getTime(), "Df");
-
-        record.add(headerCols[0], values[0]);
-        record.add(headerCols[1], values[1]);
-        record.add(headerCols[2], values[2]);
-        record.add(headerCols[3], values[3]);
-        record.add(headerCols[4], values[4]
-            .substring(0, values[4].length() - 1)); // Remove %
-        record.add(headerCols[5], values[5]);
-
-        output.collect(key, record);
-      }
-
-      // log.info("DFProcessor output 1 DF record");
-    } catch (ParseException e) {
-      e.printStackTrace();
-      log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
-      throw e;
-    } catch (IOException e) {
-      e.printStackTrace();
-      log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]",
-          e);
-      throw e;
-    } catch (DFInvalidRecord e) {
-      e.printStackTrace();
-      log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
-      throw e;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
index 769c054..f671049 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
@@ -50,22 +50,9 @@ import org.json.simple.JSONValue;
 @Table(name="Hadoop",columnFamily="mapred_tasktracker"),
 @Table(name="Hadoop",columnFamily="rpc_metrics")
 })
-public class HadoopMetricsProcessor extends AbstractProcessor {
-//  public static final String jvm = "jvm_metrics";
-//  public static final String mapred = "mapred_metrics";
-//  public static final String dfs = "dfs_metrics";
-//  public static final String namenode = "dfs_namenode";
-//  public static final String fsdir = "dfs_FSDirectory";
-//  public static final String fsname = "dfs_FSNamesystem";
-//  public static final String datanode = "dfs_datanode";
-//  public static final String jobtracker = "mapred_jobtracker";
-//  public static final String shuffleIn = "mapred_shuffleInput";
-//  public static final String shuffleOut = "mapred_shuffleOutput";
-//  public static final String tasktracker = "mapred_tasktracker";
-//  public static final String mr = "mapred_job";
-  
+public class HadoopMetricsProcessor extends AbstractProcessor {  
   static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
-  static final String chukwaTimestampField = "chukwa_timestamp";
+  static final String chukwaTimestampField = "timestamp";
   static final String contextNameField = "contextName";
   static final String recordNameField = "recordName";
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
deleted file mode 100644
index 452ddb8..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="SystemMetrics")
-public class Iostat extends AbstractProcessor {
-  static Logger log = Logger.getLogger(Iostat.class);
-  public final String recordType = this.getClass().getName();
-  
-  private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
-  private static Pattern p = null;
-
-  private Matcher matcher = null;
-  private SimpleDateFormat sdf = null;
-
-  public Iostat() {
-    // TODO move that to config
-    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-    p = Pattern.compile(regex);
-  }
-
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-      throws Throwable {
-
-    log.debug("Iostat record: [" + recordEntry + "] type["
-        + chunk.getDataType() + "]");
-    int i = 0;
-
-    matcher = p.matcher(recordEntry);
-    while (matcher.find()) {
-      log.debug("Iostat Processor Matches");
-
-      try {
-        Date d = sdf.parse(matcher.group(1).trim());
-
-        String[] lines = recordEntry.split("\n");
-        String[] headers = null;
-        for (int skip = 0; skip < 2; skip++) {
-          i++;
-          while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) {
-            // Skip the first output because the numbers are averaged from
-            // system boot up
-            log.debug("skip line:" + lines[i]);
-            i++;
-          }
-        }
-        while (i < lines.length) {
-          ChukwaRecord record = null;
-
-          if (lines[i].indexOf("avg-cpu") >= 0
-              || lines[i].indexOf("Device") >= 0) {
-            headers = parseHeader(lines[i]);
-            i++;
-          }
-          String data[] = parseData(lines[i]);
-          if (headers[0].equals("avg-cpu:")) {
-            log.debug("Matched CPU-Utilization");
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
-          } else if (headers[0].equals("Device:")) {
-            log.debug("Matched Iostat");
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
-          } else {
-            log.debug("No match:" + headers[0]);
-          }
-          if (record != null) {
-            int j = 0;
-            log.debug("Data Length: " + data.length);
-            while (j < data.length) {
-              log.debug("header:" + headers[j] + " data:" + data[j]);
-              if (!headers[j].equals("avg-cpu:")) {
-                try {
-                  // Filter out overflow values for older linux systems
-                  long x=Long.parseLong(data[j]);
-                  if(x<100000000000L) {
-                    record.add(headers[j],data[j]);
-                  }
-                } catch(NumberFormatException ex) {
-                  record.add(headers[j],data[j]);
-                }
-              }
-              j++;
-            }
-            record.setTime(d.getTime());
-            if (data.length > 3) {
-              output.collect(key, record);
-            }
-          }
-          i++;
-        }
-        // End of parsing
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw e;
-      }
-    }
-  }
-
-  public String[] parseHeader(String header) {
-    String[] headers = header.split("\\s+");
-    return headers;
-  }
-
-  public String[] parseData(String dataLine) {
-    String[] data = dataLine.split("\\s+");
-    return data;
-  }
-
-  public String getDataType() {
-    return recordType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
index 0be8615..dbf4baa 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
@@ -39,7 +39,7 @@ import org.apache.log4j.Logger;
 })
 public class JobSummary extends AbstractProcessor {  
   static Logger log = Logger.getLogger(JobSummary.class);
-  static final String chukwaTimestampField = "chukwa_timestamp";
+  static final String chukwaTimestampField = "timestamp";
   static final String contextNameField = "contextName";
   static final String recordNameField = "recordName";
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
index 9c8a3ae..16b12f3 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
@@ -53,7 +53,7 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor {
       JSONObject json = new JSONObject(log.getBody());
 
       // round timestamp
-      timestamp = json.getLong("chukwa_timestamp");
+      timestamp = json.getLong("timestamp");
       timestamp = (timestamp / 60000) * 60000;
 
       // get record type

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
deleted file mode 100644
index 93252d4..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-public class PbsInvalidEntry extends Exception {
-
-  /**
-	 * 
-	 */
-  private static final long serialVersionUID = 9154096600390233023L;
-
-  public PbsInvalidEntry() {
-  }
-
-  public PbsInvalidEntry(String message) {
-    super(message);
-  }
-
-  public PbsInvalidEntry(Throwable cause) {
-    super(cause);
-  }
-
-  public PbsInvalidEntry(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
deleted file mode 100644
index 23dc74c..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-public class PbsNodes extends AbstractProcessor {
-  static Logger log = Logger.getLogger(PbsNodes.class);
-
-  private static final String rawPBSRecordType = "PbsNodes";
-  private static final String machinePBSRecordType = "MachinePbsNodes";
-  private SimpleDateFormat sdf = null;
-
-  public PbsNodes() {
-    // TODO move that to config
-    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  }
-
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-      throws Throwable {
-
-    // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" +
-    // chunk.getDataType() + "]");
-
-    StringBuilder sb = new StringBuilder();
-    int i = 0;
-    String nodeActivityStatus = null;
-    StringBuilder sbFreeMachines = new StringBuilder();
-    StringBuilder sbUsedMachines = new StringBuilder();
-    StringBuilder sbDownMachines = new StringBuilder();
-
-    int totalFreeNode = 0;
-    int totalUsedNode = 0;
-    int totalDownNode = 0;
-
-    String body = null;
-    ChukwaRecord record = null;
-
-    try {
-
-      String dStr = recordEntry.substring(0, 23);
-      int start = 24;
-      int idx = recordEntry.indexOf(' ', start);
-      // String level = recordEntry.substring(start, idx);
-      start = idx + 1;
-      idx = recordEntry.indexOf(' ', start);
-      // String className = recordEntry.substring(start, idx-1);
-      body = recordEntry.substring(idx + 1);
-
-      Date d = sdf.parse(dStr);
-
-      String[] lines = body.split("\n");
-      while (i < lines.length) {
-        while ((i < lines.length) && (lines[i].trim().length() > 0)) {
-          sb.append(lines[i].trim()).append("\n");
-          i++;
-        }
-
-        if ((i < lines.length) && (lines[i].trim().length() > 0)) {
-          throw new PbsInvalidEntry(recordEntry);
-        }
-
-        // Empty line
-        i++;
-
-        if (sb.length() > 0) {
-          body = sb.toString();
-          // Process all entries for a machine
-          // System.out.println("=========>>> Record [" + body+ "]");
-
-          record = new ChukwaRecord();
-          key = new ChukwaRecordKey();
-
-          buildGenericRecord(record, null, d.getTime(), machinePBSRecordType);
-          parsePbsRecord(body, record);
-
-          // Output PbsNode record for 1 machine
-          output.collect(key, record);
-          // log.info("PbsNodeProcessor output 1 sub-record");
-
-          // compute Node Activity information
-          nodeActivityStatus = record.getValue("state");
-          if (nodeActivityStatus != null) {
-            if (nodeActivityStatus.equals("free")) {
-              totalFreeNode++;
-              sbFreeMachines.append(record.getValue("Machine")).append(",");
-            } else if (nodeActivityStatus.equals("job-exclusive")) {
-              totalUsedNode++;
-              sbUsedMachines.append(record.getValue("Machine")).append(",");
-            } else {
-              totalDownNode++;
-              sbDownMachines.append(record.getValue("Machine")).append(",");
-            }
-          }
-          sb = new StringBuilder();
-        }
-      }
-
-      // End of parsing
-
-      record = new ChukwaRecord();
-      key = new ChukwaRecordKey();
-      buildGenericRecord(record, null, d.getTime(), "NodeActivity");
-
-      record.setTime(d.getTime());
-      record.add("used", "" + totalUsedNode);
-      record.add("free", "" + totalFreeNode);
-      record.add("down", "" + totalDownNode);
-      record.add("usedMachines", sbUsedMachines.toString());
-      record.add("freeMachines", sbFreeMachines.toString());
-      record.add("downMachines", sbDownMachines.toString());
-
-      output.collect(key, record);
-      // log.info("PbsNodeProcessor output 1 NodeActivity");
-    } catch (ParseException e) {
-      e.printStackTrace();
-      log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
-      throw e;
-    } catch (IOException e) {
-      log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry
-          + "]", e);
-      e.printStackTrace();
-      throw e;
-    } catch (PbsInvalidEntry e) {
-      log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
-      e.printStackTrace();
-      throw e;
-    }
-
-  }
-
-  protected static void parsePbsRecord(String recordLine, ChukwaRecord record) {
-    int i = 0;
-    String[] lines = recordLine.split("\n");
-    record.add("Machine", lines[0]);
-
-    i++;
-    String[] data = null;
-    while (i < lines.length) {
-      data = extractFields(lines[i]);
-      record.add(data[0].trim(), data[1].trim());
-      if (data[0].trim().equalsIgnoreCase("status")) {
-        parseStatusField(data[1].trim(), record);
-      }
-      i++;
-    }
-  }
-
-  protected static void parseStatusField(String statusField, ChukwaRecord record) {
-    String[] data = null;
-    String[] subFields = statusField.trim().split(",");
-    for (String subflied : subFields) {
-      data = extractFields(subflied);
-      record.add("status-" + data[0].trim(), data[1].trim());
-    }
-  }
-
-  static String[] extractFields(String line) {
-    String[] args = new String[2];
-    int index = line.indexOf("=");
-    args[0] = line.substring(0, index);
-    args[1] = line.substring(index + 1);
-
-    return args;
-  }
-
-  public String getDataType() {
-    return PbsNodes.rawPBSRecordType;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
deleted file mode 100644
index 7b10edd..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="Ps")
-public class Ps extends AbstractProcessor {
-  static Logger log = Logger.getLogger(Ps.class);
-  public static final String reduceType = "Ps";
-
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-      throws Throwable {
-    LogEntry log = new LogEntry(recordEntry);
-    PsOutput ps = new PsOutput(log.getBody());
-    for (HashMap<String, String> processInfo : ps.getProcessList()) {
-      key = new ChukwaRecordKey();
-      ChukwaRecord record = new ChukwaRecord();
-      this.buildGenericRecord(record, null, log.getDate().getTime(), reduceType);
-      for (Entry<String, String> entry : processInfo.entrySet()) {
-        record.add(entry.getKey(), entry.getValue());
-      }
-      output.collect(key, record);
-    }
-  }
-
-  public static class PsOutput {
-
-    // processes info
-    private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>();
-
-    public PsOutput(String psCmdOutput) throws InvalidPsRecord {
-      if (psCmdOutput == null || psCmdOutput.length() == 0)
-        return;
-
-      String[] lines = psCmdOutput.split("[\n\r]+");
-
-      // at least two lines
-      if (lines.length < 2)
-        return;
-
-      // header
-      ArrayList<String> header = new ArrayList<String>();
-      Matcher matcher = Pattern.compile("[^ ^\t]+").matcher(lines[0]);
-      while (matcher.find()) {
-        header.add(matcher.group(0));
-      }
-      if (!header.get(header.size() - 1).equals("CMD")) {
-        throw new InvalidPsRecord("CMD must be the last column");
-      }
-
-      // records
-      boolean foundInitCmd = false;
-      for (int line = 1; line < lines.length; line++) {
-        HashMap<String, String> record = new HashMap<String, String>();
-        recordList.add(record);
-
-        matcher = Pattern.compile("[^ ^\t]+").matcher(lines[line]);
-        for (int index = 0; index < header.size(); index++) {
-          String key = header.get(index);
-          matcher.find();
-          if (!key.equals("CMD")) {
-            String value = matcher.group(0);
-            /**
-             * For STARTED column, it could be in two formats: "MMM dd" or
-             * "hh:mm:ss". If we use ' ' as the delimiter, we must read twice to
-             * the date if it's with "MMM dd" format.
-             */
-            if (key.equals("STARTED")) {
-              char c = value.charAt(0);
-              if (c < '0' || c > '9') {
-                matcher.find();
-                value += matcher.group(0);
-              }
-            }
-            record.put(key, value);
-          } else {
-            // reached the cmd part. all remains should be put
-            // together as the command
-            String value = lines[line].substring(matcher.start());
-            record.put(key, value);
-            if (!foundInitCmd)
-              foundInitCmd = value.startsWith("init");
-            break;
-          }
-        }
-      }
-      if (!foundInitCmd)
-        throw new InvalidPsRecord("Did not find 'init' cmd");
-    }
-
-    public ArrayList<HashMap<String, String>> getProcessList() {
-      return recordList;
-    }
-  }
-
-  public static class InvalidPsRecord extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public InvalidPsRecord() {
-    }
-
-    public InvalidPsRecord(String arg0) {
-      super(arg0);
-    }
-
-    public InvalidPsRecord(Throwable arg0) {
-      super(arg0);
-    }
-
-    public InvalidPsRecord(String arg0, Throwable arg1) {
-      super(arg0, arg1);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
deleted file mode 100644
index 3573b4c..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="SystemMetrics")
-public class Sar extends AbstractProcessor {
-  static Logger log = Logger.getLogger(Sar.class);
-  public static final String reduceType = "SystemMetrics";
-  public final String recordType = this.getClass().getName();
-
-  private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
-  private static Pattern p = null;
-
-  private Matcher matcher = null;
-  private SimpleDateFormat sdf = null;
-
-  public Sar() {
-    // TODO move that to config
-    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-    p = Pattern.compile(regex);
-  }
-
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-      throws Throwable {
-
-    log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType()
-        + "]");
-    int i = 0;
-
-    // String logLevel = null;
-    // String className = null;
-
-    matcher = p.matcher(recordEntry);
-    while (matcher.find()) {
-      log.debug("Sar Processor Matches");
-
-      try {
-        Date d = sdf.parse(matcher.group(1).trim());
-
-        // logLevel = matcher.group(2);
-        // className = matcher.group(3);
-
-        // TODO create a more specific key structure
-        // part of ChukwaArchiveKey + record index if needed
-        key.setKey("" + d.getTime());
-
-        String[] lines = recordEntry.split("\n");
-
-        String[] headers = null;
-        while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) {
-          // Skip to the average lines
-          log.debug("skip:" + lines[i]);
-          i++;
-        }
-        while (i < lines.length) {
-          ChukwaRecord record = null;
-          if (lines[i].equals("")) {
-            i++;
-            headers = parseHeader(lines[i]);
-            i++;
-          }
-          String data[] = parseData(lines[i]);
-
-          // FIXME please validate this
-          if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
-            log.debug("Matched Sar-Network");
-
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), reduceType);
-          } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
-            log.debug("Matched Sar-Network");
-
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), reduceType);
-          } else if (headers[1].equals("kbmemfree")) {
-            log.debug("Matched Sar-Memory");
-
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), reduceType);
-          } else if (headers[1].equals("totsck")) {
-            log.debug("Matched Sar-NetworkSockets");
-
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), reduceType);
-          } else if (headers[1].equals("runq-sz")) {
-            log.debug("Matched Sar-LoadAverage");
-
-            record = new ChukwaRecord();
-            key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), reduceType);
-          } else {
-            log.debug("No match:" + headers[1] + " " + headers[2]);
-          }
-          if (record != null) {
-            int j = 0;
-
-            log.debug("Data Length: " + data.length);
-            while (j < data.length) {
-              log.debug("header:" + headers[j] + " data:" + data[j]);
-              
-                //special case code to work around peculiar versions of Sar
-              if(headers[j].equals("rxkB/s")) {
-                record.add("rxbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
-              } else if(headers[j].equals("txkB/s")){
-                record.add("txbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
-              } else if (!headers[j].equals("Average:")) { //common case
-                record.add(headers[j], data[j]);
-              }
-              j++;
-            }
-
-            output.collect(key, record);
-          }
-          i++;
-        }
-        // End of parsing
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw e;
-      }
-    }
-  }
-
-  public String[] parseHeader(String header) {
-    String[] headers = header.split("\\s+");
-    return headers;
-  }
-
-  public String[] parseData(String dataLine) {
-    String[] data = dataLine.split("\\s+");
-    return data;
-  }
-
-  public String getDataType() {
-    return recordType;
-  }
-}
\ No newline at end of file