You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/07/26 04:08:55 UTC

[2/8] chukwa git commit: CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 8075f4d..f828ff1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -71,19 +72,20 @@ public class ChukwaHBaseStore {
   static double RESOLUTION = 360;
   static int MINUTE = 60000; //60 milliseconds
   final static int SECOND = (int) TimeUnit.SECONDS.toMillis(1);
-
-  static byte[] COLUMN_FAMILY = "t".getBytes();
-  static byte[] ANNOTATION_FAMILY = "a".getBytes();
-  static byte[] KEY_NAMES = "k".getBytes();
-  static byte[] CHART_TYPE = "chart_meta".getBytes();
-  static byte[] CHART_FAMILY = "c".getBytes();
-  static byte[] COMMON_FAMILY = "c".getBytes();
-  static byte[] WIDGET_TYPE = "widget_meta".getBytes();
-  static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes();
+  private final static Charset UTF8 = Charset.forName("UTF-8");
+
+  final static byte[] COLUMN_FAMILY = "t".getBytes(UTF8);
+  final static byte[] ANNOTATION_FAMILY = "a".getBytes(UTF8);
+  final static byte[] KEY_NAMES = "k".getBytes(UTF8);
+  final static byte[] CHART_TYPE = "chart_meta".getBytes(UTF8);
+  final static byte[] CHART_FAMILY = "c".getBytes(UTF8);
+  final static byte[] COMMON_FAMILY = "c".getBytes(UTF8);
+  final static byte[] WIDGET_TYPE = "widget_meta".getBytes(UTF8);
+  final static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes(UTF8);
   private static final String CHUKWA = "chukwa";
   private static final String CHUKWA_META = "chukwa_meta";
   private static long MILLISECONDS_IN_DAY = 86400000L;
-  protected static Connection connection = null;
+  private static Connection connection = null;
 
   public ChukwaHBaseStore() {
     super();
@@ -171,7 +173,7 @@ public class ChukwaHBaseStore {
             byte[] key = CellUtil.cloneQualifier(kv);
             long timestamp = ByteBuffer.wrap(key).getLong();
             double value = Double
-                .parseDouble(new String(CellUtil.cloneValue(kv), "UTF-8"));
+                .parseDouble(new String(CellUtil.cloneValue(kv), UTF8));
             series.add(timestamp, value);
           }
         }
@@ -179,7 +181,7 @@ public class ChukwaHBaseStore {
         currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
       }
       table.close();
-    } catch (Exception e) {
+    } catch (IOException e) {
       closeHBase();
       LOG.error(ExceptionUtil.getStackTrace(e));
     }
@@ -191,12 +193,12 @@ public class ChukwaHBaseStore {
     try {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
-      Get get = new Get(metricGroup.getBytes());
+      Get get = new Get(metricGroup.getBytes(UTF8));
       Result result = table.get(get);
       for (Cell kv : result.rawCells()) {
-        JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), "UTF-8"));
+        JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), UTF8));
         if (json.get("type").equals("metric")) {
-          familyNames.add(new String(CellUtil.cloneQualifier(kv), "UTF-8"));
+          familyNames.add(new String(CellUtil.cloneQualifier(kv), UTF8));
         }
       }
       table.close();
@@ -219,7 +221,7 @@ public class ChukwaHBaseStore {
       Iterator<Result> it = rs.iterator();
       while (it.hasNext()) {
         Result result = it.next();
-        metricGroups.add(new String(result.getRow(), "UTF-8"));
+        metricGroups.add(new String(result.getRow(), UTF8));
       }
       table.close();
     } catch (Exception e) {
@@ -241,9 +243,9 @@ public class ChukwaHBaseStore {
       while (it.hasNext()) {
         Result result = it.next();
         for (Cell cell : result.rawCells()) {
-          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
           if (json!=null && json.get("type")!=null && json.get("type").equals("source")) {
-            pk.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+            pk.add(new String(CellUtil.cloneQualifier(cell), UTF8));
           }
         }
       }
@@ -296,7 +298,7 @@ public class ChukwaHBaseStore {
         for(Cell cell : result.rawCells()) {
           byte[] dest = new byte[5];
           System.arraycopy(CellUtil.cloneRow(cell), 3, dest, 0, 5);
-          String source = new String(dest);
+          String source = new String(dest, UTF8);
           long time = cell.getTimestamp();
           // Time display in x axis
           long delta = time - startTime;
@@ -306,11 +308,11 @@ public class ChukwaHBaseStore {
           if (keyMap.containsKey(source)) {
             y = keyMap.get(source);
           } else {
-            keyMap.put(source, new Integer(index));
+            keyMap.put(source, Integer.valueOf(index));
             y = index;
             index++;
           }
-          double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
+          double v = Double.parseDouble(new String(CellUtil.cloneValue(cell), UTF8));
           heatmap.put(x, y, v);
           if (v > max) {
             max = v;
@@ -355,9 +357,9 @@ public class ChukwaHBaseStore {
       while (it.hasNext()) {
         Result result = it.next();
         for (Cell cell : result.rawCells()) {
-          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+          JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
           if (json.get("type").equals("cluster")) {
-            clusters.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+            clusters.add(new String(CellUtil.cloneQualifier(cell), UTF8));
           }
         }
       }
@@ -382,10 +384,10 @@ public class ChukwaHBaseStore {
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get get = new Get(CHART_TYPE);
       Result r = table.get(get);
-      byte[] value = r.getValue(CHART_FAMILY, id.getBytes());
+      byte[] value = r.getValue(CHART_FAMILY, id.getBytes(UTF8));
       Gson gson = new Gson();
       if(value!=null) {
-        chart = gson.fromJson(new String(value), Chart.class);
+        chart = gson.fromJson(new String(value, UTF8), Chart.class);
       }
       table.close();
     } catch (Exception e) {
@@ -408,7 +410,7 @@ public class ChukwaHBaseStore {
       Put put = new Put(CHART_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(chart);
-      put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+      put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
     } catch (Exception e) {
@@ -437,7 +439,7 @@ public class ChukwaHBaseStore {
       s.setLineOptions(l);
       series.add(s);
     }
-    chart.SetSeries(series);
+    chart.setSeries(series);
     return createChart(chart);
     
   }
@@ -469,7 +471,7 @@ public class ChukwaHBaseStore {
       Put put = new Put(CHART_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(chart);
-      put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+      put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
     } catch (Exception e) {
@@ -499,8 +501,8 @@ public class ChukwaHBaseStore {
       }
       // Figure out the time range and determine the best resolution
       // to fetch the data
-      long range = Math.round((endTime - startTime)
-        / (MINUTES_IN_HOUR * MINUTE));
+      long range = (endTime - startTime)
+        / (long) (MINUTES_IN_HOUR * MINUTE);
       long sampleRate = 1;
       if (range <= 1) {
         sampleRate = 5;
@@ -512,7 +514,7 @@ public class ChukwaHBaseStore {
         sampleRate = 87600;
       }
       double smoothing = (endTime - startTime)
-          / (sampleRate * SECOND ) / RESOLUTION;
+          / (double) (sampleRate * SECOND ) / (double) RESOLUTION;
 
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA));
@@ -550,7 +552,7 @@ public class ChukwaHBaseStore {
               byte[] key = CellUtil.cloneQualifier(kv);
               long timestamp = ByteBuffer.wrap(key).getLong();
               double value = Double.parseDouble(new String(CellUtil.cloneValue(kv),
-                  "UTF-8"));
+                  UTF8));
               if(initial==0) {
                 filteredValue = value;
               }
@@ -558,7 +560,7 @@ public class ChukwaHBaseStore {
               lastTime = timestamp;
               // Determine if there is any gap, if there is gap in data, reset
               // calculation.
-              if (elapsedTime > sampleRate) {
+              if (elapsedTime > (sampleRate * 5)) {
                 filteredValue = 0.0d;
               } else {
                 if (smoothing != 0.0d) {
@@ -587,7 +589,7 @@ public class ChukwaHBaseStore {
         list.add(clone);
       }
       table.close();
-    } catch (Exception e) {
+    } catch (IOException|CloneNotSupportedException e) {
       closeHBase();
       LOG.error(ExceptionUtil.getStackTrace(e));
     }
@@ -622,7 +624,7 @@ public class ChukwaHBaseStore {
             continue;
           }
           Gson gson = new Gson();
-          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
           list.add(widget);
           c++;
         }
@@ -658,7 +660,7 @@ public class ChukwaHBaseStore {
         Result result = it.next();
         for(Cell kv : result.rawCells()) {
           Gson gson = new Gson();
-          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+          Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
           list.add(widget);
         }
       }
@@ -683,11 +685,11 @@ public class ChukwaHBaseStore {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get widget = new Get(WIDGET_TYPE);
-      widget.addColumn(COMMON_FAMILY, title.getBytes());
+      widget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
       Result rs = table.get(widget);
-      byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes());
+      byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes(UTF8));
       Gson gson = new Gson();
-      w = gson.fromJson(new String(buffer), Widget.class);
+      w = gson.fromJson(new String(buffer, UTF8), Widget.class);
       table.close();
     } catch (Exception e) {
       closeHBase();
@@ -708,7 +710,7 @@ public class ChukwaHBaseStore {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get widgetTest = new Get(WIDGET_TYPE);
-      widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes());
+      widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8));
       if (table.exists(widgetTest)) {
         LOG.warn("Widget: " + widget.getTitle() + " already exists.");
         created = false;
@@ -716,7 +718,7 @@ public class ChukwaHBaseStore {
         Put put = new Put(WIDGET_TYPE);
         Gson gson = new Gson();
         String buffer = gson.toJson(widget);
-        put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes());
+        put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8), buffer.getBytes(UTF8));
         table.put(put);
         created = true;
       }
@@ -741,12 +743,12 @@ public class ChukwaHBaseStore {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Delete oldWidget = new Delete(WIDGET_TYPE);
-      oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+      oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
       table.delete(oldWidget);
       Put put = new Put(WIDGET_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(widget);
-      put.addColumn(COMMON_FAMILY, title.getBytes(), buffer.getBytes());
+      put.addColumn(COMMON_FAMILY, title.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
       result = true;
@@ -772,7 +774,7 @@ public class ChukwaHBaseStore {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Delete oldWidget = new Delete(WIDGET_TYPE);
-      oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+      oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
       table.delete(oldWidget);
       table.close();
       result = true;
@@ -790,7 +792,7 @@ public class ChukwaHBaseStore {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get dashboardTest = new Get(DASHBOARD_TYPE);
-      dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes());
+      dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes(UTF8));
       exists = table.exists(dashboardTest);
       table.close();
     } catch (Exception e) {
@@ -931,19 +933,19 @@ public class ChukwaHBaseStore {
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
       Get dashboard = new Get(DASHBOARD_TYPE);
-      dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+      dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
       Result rs = table.get(dashboard);
-      byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes());
+      byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));
       if(buffer == null) {
         // If user dashboard is not found, use default dashboard.
         key = new StringBuilder().append(id).append("|").toString();
         dashboard = new Get(DASHBOARD_TYPE);
-        dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+        dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
         rs = table.get(dashboard);
-        buffer = rs.getValue(COMMON_FAMILY, key.getBytes());        
+        buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));        
       }
       Gson gson = new Gson();
-      dash = gson.fromJson(new String(buffer), Dashboard.class);
+      dash = gson.fromJson(new String(buffer, UTF8), Dashboard.class);
       table.close();
     } catch (Exception e) {
       closeHBase();
@@ -964,7 +966,7 @@ public class ChukwaHBaseStore {
       Put put = new Put(DASHBOARD_TYPE);
       Gson gson = new Gson();
       String buffer = gson.toJson(dash);
-      put.addColumn(COMMON_FAMILY, key.getBytes(), buffer.getBytes());
+      put.addColumn(COMMON_FAMILY, key.getBytes(UTF8), buffer.getBytes(UTF8));
       table.put(put);
       table.close();
       result = true;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
index eb79cd7..4f5f289 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.chukwa.extraction.hbase;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -37,7 +38,7 @@ public abstract class AbstractProcessor {
   protected String sourceHelper;
 
   protected byte[] key = null;
-  byte[] CF = "t".getBytes();
+  byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
 
   boolean chunkInErrorSaved = false;
   ArrayList<Put> output = null;
@@ -70,14 +71,14 @@ public abstract class AbstractProcessor {
     byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
     Put put = new Put(key);
     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-    put.add(CF, timeInBytes, time, value);
+    put.addColumn(CF, timeInBytes, time, value);
     output.add(put);
     reporter.putMetric(chunk.getDataType(), primaryKey);
     reporter.putSource(chunk.getDataType(), source);
   }
 
   public void addRecord(String primaryKey, String value) {
-    addRecord(primaryKey, value.getBytes());
+    addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
   }
 
   /**
@@ -96,7 +97,7 @@ public abstract class AbstractProcessor {
     byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
     Put put = new Put(key);
     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-    put.add(CF, timeInBytes, time, value);
+    put.addColumn(CF, timeInBytes, time, value);
     output.add(put);
     reporter.putMetric(chunk.getDataType(), primaryKey);
   }
@@ -126,7 +127,7 @@ public abstract class AbstractProcessor {
     Put put = new Put(key);
     String family = "a";
     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-    put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes());
+    put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
     output.add(put);
   }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
index 2da64a3..483ac71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
@@ -17,34 +17,46 @@
  */
 package org.apache.hadoop.chukwa.extraction.hbase;
 
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.chukwa.util.HBaseUtil;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 
 public class DefaultProcessor extends AbstractProcessor {
-	
+
   public DefaultProcessor() throws NoSuchAlgorithmException {
-	super();
-	// TODO Auto-generated constructor stub
+    super();
+    // TODO Auto-generated constructor stub
   }
 
-static Logger LOG = Logger.getLogger(DefaultProcessor.class);
+  static Logger LOG = Logger.getLogger(DefaultProcessor.class);
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-	  byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource());
-	  Put put = new Put(key);
-	  byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
-	  put.add("t".getBytes(), timeInBytes, chunk.getData());
-	  output.add(put);
-	  JSONObject json = new JSONObject();
-	  json.put("sig", key);
-	  json.put("type", "unknown");
-	  reporter.put(chunk.getDataType(), chunk.getSource(), json.toString());
+    byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(),
+        chunk.getSource());
+    Put put = new Put(key);
+    byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+    put.addColumn("t".getBytes(Charset.forName("UTF-8")), timeInBytes,
+        chunk.getData());
+    output.add(put);
+    Type defaultType = new TypeToken<Map<String, String>>() {
+    }.getType();
+    Gson gson = new Gson();
+    Map<String, String> meta = new HashMap<String, String>();
+    meta.put("sig", new String(key, Charset.forName("UTF-8")));
+    meta.put("type", "unknown");
+    String buffer = gson.toJson(meta, defaultType);
+    reporter.put(chunk.getDataType(), chunk.getSource(), buffer);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
index 19df607..de64a0d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.extraction.hbase;
 
-
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
 
-import org.apache.hadoop.chukwa.util.HBaseUtil;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -37,15 +37,14 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
   static final String recordNameField = "recordName";
   static final String hostName = "Hostname";
   static final String processName = "ProcessName";
-  static final byte[] cf = "t".getBytes();
+  static final byte[] cf = "t".getBytes(Charset.forName("UTF-8"));
 
   public HadoopMetricsProcessor() throws NoSuchAlgorithmException {
   }
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-    try {
-      String body = new String(recordEntry);
+      String body = new String(recordEntry, Charset.forName("UTF-8"));
       int start = body.indexOf('{');
       JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
 
@@ -56,10 +55,8 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
       if(json.get(processName)!=null) {
         src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
       }
-      @SuppressWarnings("unchecked")
-      Iterator<String> ki = json.keySet().iterator();
-      while (ki.hasNext()) {
-        String keyName = ki.next();
+      for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+        String keyName = entry.getKey();
         if (timestampField.intern() == keyName.intern()) {
           continue;
         } else if (contextNameField.intern() == keyName.intern()) {
@@ -71,20 +68,14 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
         } else if (processName.intern() == keyName.intern()) {
           continue;
         } else {
-          if (json.get(keyName) != null) {
-            String v = json.get(keyName).toString();
+          if(json.get(keyName)!=null) {
+            String v = entry.getValue().toString();
             String primaryKey = new StringBuilder(contextName).append(".")
                 .append(recordName).append(".").append(keyName).toString();
-            addRecord(time, primaryKey, src, v.getBytes(), output);
+            addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
           }
         }
       }
-
-    } catch (Exception e) {
-      LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
-          e);
-      throw e;
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
index dcbe2d4..0682c71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
@@ -22,43 +22,43 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 
 public class LogEntry {
-	private final static SimpleDateFormat sdf = new SimpleDateFormat(
-			"yyyy-MM-dd HH:mm");
+  private SimpleDateFormat sdf = new SimpleDateFormat(
+      "yyyy-MM-dd HH:mm");
 
-	private Date date;
-	private String logLevel;
-	private String className;
-	private String body;
+  private Date date;
+  private String logLevel;
+  private String className;
+  private String body;
 
-	public LogEntry(String recordEntry) throws ParseException {
-		String dStr = recordEntry.substring(0, 23);
-		date = sdf.parse(dStr);
-		int start = 24;
-		int idx = recordEntry.indexOf(' ', start);
-		logLevel = recordEntry.substring(start, idx);
-		start = idx + 1;
-		idx = recordEntry.indexOf(' ', start);
-		className = recordEntry.substring(start, idx - 1);
-		body = recordEntry.substring(idx + 1);
-	}
+  public LogEntry(String recordEntry) throws ParseException {
+    String dStr = recordEntry.substring(0, 23);
+    date = sdf.parse(dStr);
+    int start = 24;
+    int idx = recordEntry.indexOf(' ', start);
+    logLevel = recordEntry.substring(start, idx);
+    start = idx + 1;
+    idx = recordEntry.indexOf(' ', start);
+    className = recordEntry.substring(start, idx - 1);
+    body = recordEntry.substring(idx + 1);
+  }
 
-	public Date getDate() {
-		return date;
-	}
+  public Date getDate() {
+    return (Date) date.clone();
+  }
 
-	public void setDate(Date date) {
-		this.date = date;
-	}
+  public void setDate(Date date) {
+    this.date = (Date) date.clone();
+  }
 
-	public String getLogLevel() {
-		return logLevel;
-	}
+  public String getLogLevel() {
+    return logLevel;
+  }
 
-	public String getClassName() {
-		return className;
-	}
+  public String getClassName() {
+    return className;
+  }
 
-	public String getBody() {
-		return body;
-	}
+  public String getBody() {
+    return body;
+  }
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
index c2695f2..3718fbd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -22,20 +22,14 @@
  */
 package org.apache.hadoop.chukwa.extraction.hbase;
 
+import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Iterator;
-import java.util.TimeZone;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
-import org.apache.hadoop.chukwa.Chunk;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -48,7 +42,7 @@ public class SystemMetrics extends AbstractProcessor {
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-    String buffer = new String(recordEntry);
+    String buffer = new String(recordEntry, Charset.forName("UTF-8"));
     JSONObject json = (JSONObject) JSONValue.parse(buffer);
     time = ((Long) json.get("timestamp")).longValue();
     ChukwaRecord record = new ChukwaRecord();
@@ -70,11 +64,9 @@ public class SystemMetrics extends AbstractProcessor {
       user = user + Double.parseDouble(cpu.get("user").toString());
       sys = sys + Double.parseDouble(cpu.get("sys").toString());
       idle = idle + Double.parseDouble(cpu.get("idle").toString());
-      @SuppressWarnings("unchecked")
-      Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator();
-      while(iterator.hasNext()) {
-        String key = iterator.next();
-        addRecord("cpu." + key + "." + i, cpu.get(key).toString());
+      for(Entry<String, Object> entry : (Set<Map.Entry>) cpu.entrySet()) {
+        String key = entry.getKey();
+        addRecord("cpu." + key + "." + i, String.valueOf(entry.getValue()));
       }
     }
     combined = combined / actualSize;
@@ -94,20 +86,15 @@ public class SystemMetrics extends AbstractProcessor {
 
     record = new ChukwaRecord();
     JSONObject memory = (JSONObject) json.get("memory");
-    @SuppressWarnings("unchecked")
-    Iterator<String> memKeys = memory.keySet().iterator();
-    while (memKeys.hasNext()) {
-      String key = memKeys.next();
-      addRecord("memory." + key, memory.get(key).toString());
+    for(Entry<String, Object> entry : (Set<Map.Entry>) memory.entrySet()) {
+      String key = entry.getKey();
+      addRecord("memory." + key, String.valueOf(entry.getValue()));
     }
 
     record = new ChukwaRecord();
     JSONObject swap = (JSONObject) json.get("swap");
-    @SuppressWarnings("unchecked")
-    Iterator<String> swapKeys = swap.keySet().iterator();
-    while (swapKeys.hasNext()) {
-      String key = swapKeys.next();
-      addRecord("swap." + key, swap.get(key).toString());
+    for(Map.Entry<String, Object> entry : (Set<Map.Entry>) swap.entrySet()) {
+      addRecord("swap." + entry.getKey(), String.valueOf(entry.getValue()));
     }
 
     double rxBytes = 0;
@@ -122,28 +109,30 @@ public class SystemMetrics extends AbstractProcessor {
     JSONArray netList = (JSONArray) json.get("network");
     for (int i = 0; i < netList.size(); i++) {
       JSONObject netIf = (JSONObject) netList.get(i);
-      @SuppressWarnings("unchecked")
-      Iterator<String> keys = netIf.keySet().iterator();
-      while (keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, netIf.get(key).toString());
+      for(Map.Entry<String, Object> entry : (Set<Map.Entry>) netIf.entrySet()) {
+        String key = entry.getKey();
+        long value = 0;
+        if(entry.getValue() instanceof Long) {
+          value = (Long) entry.getValue();
+        }
+        record.add(key + "." + i, String.valueOf(entry.getValue()));
         if (i != 0) {
           if (key.equals("RxBytes")) {
-            rxBytes = rxBytes + (Long) netIf.get(key);
+            rxBytes = rxBytes + value;
           } else if (key.equals("RxDropped")) {
-            rxDropped = rxDropped + (Long) netIf.get(key);
+            rxDropped = rxDropped + value;
           } else if (key.equals("RxErrors")) {
-            rxErrors = rxErrors + (Long) netIf.get(key);
+            rxErrors = rxErrors + value;
           } else if (key.equals("RxPackets")) {
-            rxPackets = rxPackets + (Long) netIf.get(key);
+            rxPackets = rxPackets + value;
           } else if (key.equals("TxBytes")) {
-            txBytes = txBytes + (Long) netIf.get(key);
+            txBytes = txBytes + value;
           } else if (key.equals("TxCollisions")) {
-            txCollisions = txCollisions + (Long) netIf.get(key);
+            txCollisions = txCollisions + value;
           } else if (key.equals("TxErrors")) {
-            txErrors = txErrors + (Long) netIf.get(key);
+            txErrors = txErrors + value;
           } else if (key.equals("TxPackets")) {
-            txPackets = txPackets + (Long) netIf.get(key);
+            txPackets = txPackets + value;
           }
         }
       }
@@ -168,22 +157,25 @@ public class SystemMetrics extends AbstractProcessor {
     JSONArray diskList = (JSONArray) json.get("disk");
     for (int i = 0; i < diskList.size(); i++) {
       JSONObject disk = (JSONObject) diskList.get(i);
-      Iterator<String> keys = disk.keySet().iterator();
-      while (keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, disk.get(key).toString());
+      for(Entry<String, Object> entry : (Set<Map.Entry>) disk.entrySet()) {
+        String key = entry.getKey();
+        long value = 0;
+        if(entry.getValue() instanceof Long) {
+          value = (Long) entry.getValue();
+        }
+        record.add(key + "." + i, String.valueOf(entry.getValue()));
         if (key.equals("ReadBytes")) {
-          readBytes = readBytes + (Long) disk.get("ReadBytes");
+          readBytes = readBytes + value;
         } else if (key.equals("Reads")) {
-          reads = reads + (Long) disk.get("Reads");
+          reads = reads + Long.valueOf(value);;
         } else if (key.equals("WriteBytes")) {
-          writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+          writeBytes = writeBytes + value;
         } else if (key.equals("Writes")) {
-          writes = writes + (Long) disk.get("Writes");
+          writes = writes + value;
         } else if (key.equals("Total")) {
-          total = total + (Long) disk.get("Total");
+          total = total + value;
         } else if (key.equals("Used")) {
-          used = used + (Long) disk.get("Used");
+          used = used + value;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
index 9c21bf1..02fe3b7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
@@ -20,21 +20,20 @@ package org.apache.hadoop.chukwa.hicc;
 
 
 import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
 
 import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore;
 
 public class ClusterConfig {
-  private static Set<String> clusterMap = null;
+  private Set<String> clusterMap = null;
 
   static public String getContents(File aFile) {
     // ...checks on aFile are elided
     StringBuffer contents = new StringBuffer();
 
     try {
-      // use buffering, reading one line at a time
-      // FileReader always assumes default encoding is OK!
-      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
       try {
         String line = null; // not declared within while loop
         /*

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
index 2d84c09..fe90941 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -124,12 +125,12 @@ public class HiccWebServer {
         StringBuilder sb = new StringBuilder();
         String line = null;
         try {
-          BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+          BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
           while ((line = reader.readLine()) != null) {
             sb.append(line + "\n");
           }
           FSDataOutputStream out = fs.create(dest);
-          out.write(sb.toString().getBytes());
+          out.write(sb.toString().getBytes(Charset.forName("UTF-8")));
           out.close();
           reader.close();
         } catch(IOException e) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
index 7c8c6a7..19cde5f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
@@ -111,7 +111,7 @@ public class Chart {
     return this.id;
   }
 
-  public void SetSeries(List<SeriesMetaData> series) {
+  public void setSeries(List<SeriesMetaData> series) {
     this.series = series;
   }
   

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
index 61dc0b5..74d5e89 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
@@ -89,11 +89,11 @@ public class Widget {
   }
 
   public String[] getTokens() {
-    return tokens;
+    return tokens.clone();
   }
 
   public void setTokens(String[] tokens) {
-    this.tokens = tokens;
+    this.tokens = tokens.clone();
   }
 
   public void tokenize() {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
index c9413bd..869efa4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLEncoder;
+import java.nio.charset.Charset;
 import java.util.Map;
 
 import javax.servlet.ServletException;
@@ -44,11 +45,11 @@ public class HttpProxy extends HttpServlet {
   private final String USER_AGENT = "Mozilla/5.0";
   private final static String SOLR_URL = "chukwa.solr.url";
   private final static Logger LOG = Logger.getLogger(HttpProxy.class);
-  private ChukwaConfiguration conf = new ChukwaConfiguration();
   private String solrUrl = null;
 
   public HttpProxy() {
     super();
+    ChukwaConfiguration conf = new ChukwaConfiguration();
     solrUrl = conf.get(SOLR_URL);
   }
 
@@ -72,7 +73,7 @@ public class HttpProxy extends HttpServlet {
     LOG.info("Response Code : " + responseCode);
 
     BufferedReader in = new BufferedReader(new InputStreamReader(
-        con.getInputStream()));
+        con.getInputStream(), Charset.forName("UTF-8")));
     String inputLine;
     StringBuffer response1 = new StringBuffer();
 
@@ -80,7 +81,7 @@ public class HttpProxy extends HttpServlet {
 
     while ((inputLine = in.readLine()) != null) {
       response1.append(inputLine);
-      sout.write(inputLine.getBytes());
+      sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
     }
     in.close();
 
@@ -131,7 +132,7 @@ public class HttpProxy extends HttpServlet {
     LOG.debug("Response Code : " + responseCode);
 
     BufferedReader in = new BufferedReader(new InputStreamReader(
-        con.getInputStream()));
+        con.getInputStream(), Charset.forName("UTF-8")));
     String inputLine;
     StringBuffer response1 = new StringBuffer();
 
@@ -139,7 +140,7 @@ public class HttpProxy extends HttpServlet {
 
     while ((inputLine = in.readLine()) != null) {
       response1.append(inputLine);
-      sout.write(inputLine.getBytes());
+      sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
     }
     in.close();
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
index ceed0df..1441253 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.chukwa.hicc.rest;
 import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
@@ -63,8 +65,8 @@ public class SessionController {
     Gson gson = new Gson();
     Type stringStringMap = new TypeToken<Map<String, String>>(){}.getType();
     Map<String,String> map = gson.fromJson(buffer, stringStringMap);
-    for(String key : map.keySet()) {
-      request.getSession().setAttribute(key, map.get(key));
+    for(Entry<String, String> entry : (Set<Map.Entry<String, String>>) map.entrySet()) {
+      request.getSession().setAttribute(entry.getKey(), entry.getValue());
     }
     return Response.ok().build();
   }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
index ea07797..4524922 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
@@ -39,7 +39,7 @@ public class VelocityResolver implements InjectableProvider<Context, Type> {
 
   private VelocityEngine ve;
   private static Logger LOG = Logger.getLogger(VelocityResolver.class);
-  public static String LOGGER_NAME = VelocityResolver.class.getName();
+  public final static String LOGGER_NAME = VelocityResolver.class.getName();
   
   /**
    * Jersey configuration for setting up Velocity configuration.

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
deleted file mode 100644
index f0a3303..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.sql.SQLException;
-import java.util.Calendar;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Timer;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueInfoProcessor {
-
-  private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
-
-  private int intervalValue = 60;
-  private String torqueServer = null;
-  private String torqueBinDir = null;
-  private String domain = null;
-
-  private TreeMap<String, TreeMap<String, String>> currentHodJobs;
-
-  public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
-    this.intervalValue = interval;
-
-    torqueServer = System.getProperty("TORQUE_SERVER");
-    torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
-    domain = System.getProperty("DOMAIN");
-    currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
-  }
-
-  public void setup(boolean recover) throws Exception {
-  }
-
-  private void getHodJobInfo() throws IOException {
-    StringBuffer sb = new StringBuffer();
-    sb.append(torqueBinDir).append("/qstat -a");
-
-    String[] getQueueInfoCommand = new String[3];
-    getQueueInfoCommand[0] = "ssh";
-    getQueueInfoCommand[1] = torqueServer;
-    getQueueInfoCommand[2] = sb.toString();
-
-    String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
-        + " " + getQueueInfoCommand[2];
-    ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
-
-    Process p = pb.start();
-
-    Timer timeout = new Timer();
-    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
-    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
-    BufferedReader result = new BufferedReader(new InputStreamReader(p
-        .getInputStream()));
-    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
-        command, true);
-    errorHandler.start();
-
-    String line = null;
-    boolean start = false;
-    TreeSet<String> jobsInTorque = new TreeSet<String>();
-    while ((line = result.readLine()) != null) {
-      if (line.startsWith("---")) {
-        start = true;
-        continue;
-      }
-
-      if (start) {
-        String[] items = line.split("\\s+");
-        if (items.length >= 10) {
-          String hodIdLong = items[0];
-          String hodId = hodIdLong.split("[.]")[0];
-          String userId = items[1];
-          String numOfMachine = items[5];
-          String status = items[9];
-          jobsInTorque.add(hodId);
-          if (!currentHodJobs.containsKey(hodId)) {
-            TreeMap<String, String> aJobData = new TreeMap<String, String>();
-
-            aJobData.put("userId", userId);
-            aJobData.put("numOfMachine", numOfMachine);
-            aJobData.put("traceCheckCount", "0");
-            aJobData.put("process", "0");
-            aJobData.put("status", status);
-            currentHodJobs.put(hodId, aJobData);
-          } else {
-            TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-            aJobData.put("status", status);
-            currentHodJobs.put(hodId, aJobData);
-          }// if..else
-        }
-      }
-    }// while
-
-    try {
-      errorHandler.join();
-    } catch (InterruptedException ie) {
-      log.error(ie.getMessage());
-    }
-    timeout.cancel();
-
-    Set<String> currentHodJobIds = currentHodJobs.keySet();
-    Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
-    TreeSet<String> finishedHodIds = new TreeSet<String>();
-    while (currentHodJobIdsIt.hasNext()) {
-      String hodId = currentHodJobIdsIt.next();
-      if (!jobsInTorque.contains(hodId)) {
-        TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-        String process = aJobData.get("process");
-        if (process.equals("0") || process.equals("1")) {
-          aJobData.put("status", "C");
-        } else {
-          finishedHodIds.add(hodId);
-        }
-      }
-    }// while
-
-    Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
-    while (finishedHodIdsIt.hasNext()) {
-      String hodId = finishedHodIdsIt.next();
-      currentHodJobs.remove(hodId);
-    }
-
-  }
-
-  private boolean loadQstatData(String hodId) throws IOException, SQLException {
-    TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-    String userId = aJobData.get("userId");
-
-    StringBuffer sb = new StringBuffer();
-    sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
-    String[] qstatCommand = new String[3];
-    qstatCommand[0] = "ssh";
-    qstatCommand[1] = torqueServer;
-    qstatCommand[2] = sb.toString();
-
-    String command = qstatCommand[0] + " " + qstatCommand[1] + " "
-        + qstatCommand[2];
-    ProcessBuilder pb = new ProcessBuilder(qstatCommand);
-    Process p = pb.start();
-
-    Timer timeout = new Timer();
-    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
-    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
-    BufferedReader result = new BufferedReader(new InputStreamReader(p
-        .getInputStream()));
-    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
-        command, false);
-    errorHandler.start();
-    String line = null;
-    String hosts = null;
-    long startTimeValue = -1;
-    long endTimeValue = Calendar.getInstance().getTimeInMillis();
-    long executeTimeValue = Calendar.getInstance().getTimeInMillis();
-    boolean qstatfinished;
-
-    while ((line = result.readLine()) != null) {
-      if (line.indexOf("ctime") >= 0) {
-        String startTime = line.split("=")[1].trim();
-        // Tue Sep 9 23:44:29 2008
-        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-        Date startTimeDate;
-        try {
-          startTimeDate = sdf.parse(startTime);
-          startTimeValue = startTimeDate.getTime();
-        } catch (ParseException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-      }
-      if (line.indexOf("mtime") >= 0) {
-        String endTime = line.split("=")[1].trim();
-        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-        Date endTimeDate;
-        try {
-          endTimeDate = sdf.parse(endTime);
-          endTimeValue = endTimeDate.getTime();
-        } catch (ParseException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-      }
-      if (line.indexOf("etime") >= 0) {
-        String executeTime = line.split("=")[1].trim();
-        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-        Date executeTimeDate;
-        try {
-          executeTimeDate = sdf.parse(executeTime);
-          executeTimeValue = executeTimeDate.getTime();
-        } catch (ParseException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-      }
-      if (line.indexOf("exec_host") >= 0) {
-        hosts = line.split("=")[1].trim();
-      }
-    }
-
-    if (hosts != null && startTimeValue >= 0) {
-      String[] items2 = hosts.split("[+]");
-      int num = 0;
-      for (int i = 0; i < items2.length; i++) {
-        String machinetmp = items2[i];
-        if (machinetmp.length() > 3) {
-          String machine = items2[i].substring(0, items2[i].length() - 2);
-          StringBuffer data = new StringBuffer();
-          data.append("HodId=").append(hodId);
-          data.append(", Machine=").append(machine);
-          if (domain != null) {
-            data.append(".").append(domain);
-          }
-          log.info(data);
-          num++;
-        }
-      }
-      Timestamp startTimedb = new Timestamp(startTimeValue);
-      Timestamp endTimedb = new Timestamp(endTimeValue);
-      StringBuffer data = new StringBuffer();
-      long timeQueued = executeTimeValue - startTimeValue;
-      data.append("HodID=").append(hodId);
-      data.append(", UserId=").append(userId);
-      data.append(", StartTime=").append(startTimedb);
-      data.append(", TimeQueued=").append(timeQueued);
-      data.append(", NumOfMachines=").append(num);
-      data.append(", EndTime=").append(endTimedb);
-      log.info(data);
-      qstatfinished = true;
-
-    } else {
-
-      qstatfinished = false;
-    }
-
-    try {
-      errorHandler.join();
-    } catch (InterruptedException ie) {
-      log.error(ie.getMessage());
-    }
-    result.close();
-    timeout.cancel();
-
-    return qstatfinished;
-  }
-
-  private boolean loadTraceJobData(String hodId) throws IOException,
-      SQLException {
-    TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-    String userId = aJobData.get("userId");
-
-    StringBuffer sb = new StringBuffer();
-    sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
-    String[] traceJobCommand = new String[3];
-    traceJobCommand[0] = "ssh";
-    traceJobCommand[1] = torqueServer;
-    traceJobCommand[2] = sb.toString();
-
-    String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
-        + traceJobCommand[2];
-    ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
-
-    Process p = pb.start();
-
-    Timer timeout = new Timer();
-    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
-    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
-    BufferedReader result = new BufferedReader(new InputStreamReader(p
-        .getInputStream()));
-    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
-        command, false);
-    errorHandler.start();
-    String line = null;
-    String exit_status = null;
-    String hosts = null;
-    long timeQueued = -1;
-    long startTimeValue = -1;
-    long endTimeValue = -1;
-    boolean findResult = false;
-
-    while ((line = result.readLine()) != null && !findResult) {
-      if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
-          && line.indexOf("qtime") >= 0) {
-        TreeMap<String, String> jobData = new TreeMap<String, String>();
-        String[] items = line.split("\\s+");
-        for (int i = 0; i < items.length; i++) {
-          String[] items2 = items[i].split("=");
-          if (items2.length >= 2) {
-            jobData.put(items2[0], items2[1]);
-          }
-
-        }
-        String startTime = jobData.get("ctime");
-        startTimeValue = Long.valueOf(startTime);
-        startTimeValue = startTimeValue - startTimeValue % (60);
-        Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
-
-        String queueTime = jobData.get("qtime");
-        long queueTimeValue = Long.valueOf(queueTime);
-
-        String sTime = jobData.get("start");
-        long sTimeValue = Long.valueOf(sTime);
-
-        timeQueued = sTimeValue - queueTimeValue;
-
-        String endTime = jobData.get("end");
-        endTimeValue = Long.valueOf(endTime);
-        endTimeValue = endTimeValue - endTimeValue % (60);
-        Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
-
-        exit_status = jobData.get("Exit_status");
-        hosts = jobData.get("exec_host");
-        String[] items2 = hosts.split("[+]");
-        int num = 0;
-        for (int i = 0; i < items2.length; i++) {
-          String machinetemp = items2[i];
-          if (machinetemp.length() >= 3) {
-            String machine = items2[i].substring(0, items2[i].length() - 2);
-            StringBuffer data = new StringBuffer();
-            data.append("HodId=").append(hodId);
-            data.append(", Machine=").append(machine);
-            if (domain != null) {
-              data.append(".").append(domain);
-            }
-            log.info(data.toString());
-            num++;
-          }
-        }
-
-        StringBuffer data = new StringBuffer();
-        data.append("HodID=").append(hodId);
-        data.append(", UserId=").append(userId);
-        data.append(", Status=").append(exit_status);
-        data.append(", TimeQueued=").append(timeQueued);
-        data.append(", StartTime=").append(startTimedb);
-        data.append(", EndTime=").append(endTimedb);
-        data.append(", NumOfMachines=").append(num);
-        log.info(data.toString());
-        findResult = true;
-        log.debug(" hod info for job " + hodId + " has been loaded ");
-      }// if
-
-    }// while
-
-    try {
-      errorHandler.join();
-    } catch (InterruptedException ie) {
-      log.error(ie.getMessage());
-    }
-
-    timeout.cancel();
-    boolean tracedone = false;
-    if (!findResult) {
-
-      String traceCheckCount = aJobData.get("traceCheckCount");
-      int traceCheckCountValue = Integer.valueOf(traceCheckCount);
-      traceCheckCountValue = traceCheckCountValue + 1;
-      aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
-
-      log.debug("did not find tracejob info for job " + hodId + ", after "
-          + traceCheckCountValue + " times checking");
-      if (traceCheckCountValue >= 2) {
-        tracedone = true;
-      }
-    }
-    boolean finished = findResult | tracedone;
-    return finished;
-  }
-
-  private void process_data() throws SQLException {
-
-    long currentTime = System.currentTimeMillis();
-    currentTime = currentTime - currentTime % (60 * 1000);
-
-    Set<String> hodIds = currentHodJobs.keySet();
-
-    Iterator<String> hodIdsIt = hodIds.iterator();
-    while (hodIdsIt.hasNext()) {
-      String hodId = hodIdsIt.next();
-      TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
-      String status = aJobData.get("status");
-      String process = aJobData.get("process");
-      if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
-        try {
-          boolean result = loadQstatData(hodId);
-          if (result) {
-            aJobData.put("process", "1");
-            currentHodJobs.put(hodId, aJobData);
-          }
-        } catch (IOException ioe) {
-          log.error("load qsat data Error:" + ioe.getMessage());
-
-        }
-      }
-      if (!process.equals("2") && status.equals("C")) {
-        try {
-          boolean result = loadTraceJobData(hodId);
-
-          if (result) {
-            aJobData.put("process", "2");
-            currentHodJobs.put(hodId, aJobData);
-          }
-        } catch (IOException ioe) {
-          log.error("loadTraceJobData Error:" + ioe.getMessage());
-        }
-      }// if
-
-    } // while
-
-  }
-
-  private void handle_jobData() throws SQLException {
-    try {
-      getHodJobInfo();
-    } catch (IOException ex) {
-      log.error("getQueueInfo Error:" + ex.getMessage());
-      return;
-    }
-    try {
-      process_data();
-    } catch (SQLException ex) {
-      log.error("process_data Error:" + ex.getMessage());
-      throw ex;
-    }
-  }
-
-  public void run_forever() throws SQLException {
-    while (true) {
-      handle_jobData();
-      try {
-        log.debug("sleeping ...");
-        Thread.sleep(this.intervalValue * 1000);
-      } catch (InterruptedException e) {
-        log.error(e.getMessage());
-      }
-    }
-  }
-
-  public void shutdown() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
deleted file mode 100644
index 8ea645e..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.util.TimerTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueTimerTask extends TimerTask {
-  private Process ps = null;
-  private String command;
-
-  private static Log log = LogFactory.getLog(TorqueTimerTask.class);
-  // public static int timeoutInterval=300;
-  public static int timeoutInterval = 180;
-
-  public TorqueTimerTask() {
-    super();
-    // TODO Auto-generated constructor stub
-  }
-
-  public TorqueTimerTask(Process process, String command) {
-    super();
-    this.ps = process;
-    this.command = command;
-
-  }
-
-  public void run() {
-    ps.destroy();
-    log.error("torque command: " + command + " timed out");
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
index 7ed3148..ff2a022 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.chukwa.util;
 
 
 import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
 
 public class ClusterConfig {
-  public static final HashMap<String, String> clusterMap = new HashMap<String, String>();
+  private HashMap<String, String> clusterMap = new HashMap<String, String>();
   private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator;
 
   static public String getContents(File aFile) {
@@ -31,9 +32,7 @@ public class ClusterConfig {
     StringBuffer contents = new StringBuffer();
 
     try {
-      // use buffering, reading one line at a time
-      // FileReader always assumes default encoding is OK!
-      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
       try {
         String line = null; // not declared within while loop
         /*

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
index c655e24..70a80c0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.util;
 
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Calendar;
@@ -29,7 +30,6 @@ import org.mortbay.log.Log;
 public class HBaseUtil {
   private static Logger LOG = Logger.getLogger(HBaseUtil.class);
   
-  static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
   static MessageDigest md5 = null;
   static {
     try {
@@ -50,8 +50,9 @@ public class HBaseUtil {
   }
 
   public static byte[] buildKey(long time, String primaryKey) {
+    Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
     c.setTimeInMillis(time);
-    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
     byte[] pk = getHash(primaryKey);
     byte[] key = new byte[12];
     System.arraycopy(day, 0, key, 0, day.length);
@@ -60,8 +61,9 @@ public class HBaseUtil {
   }
   
   public static byte[] buildKey(long time, String primaryKey, String source) {
+    Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
     c.setTimeInMillis(time);
-    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
     byte[] pk = getHash(primaryKey);
     byte[] src = getHash(source);
     byte[] key = new byte[12];
@@ -73,7 +75,7 @@ public class HBaseUtil {
   
   private static byte[] getHash(String key) {
     byte[] hash = new byte[5];
-    System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5);
+    System.arraycopy(md5.digest(key.getBytes(Charset.forName("UTF-8"))), 0, hash, 0, 5);
     return hash;
   }
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
index 26e4beb..c1cf37f 100644
--- a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
+++ b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
@@ -43,26 +43,6 @@ public class ChunkImplTest extends TestCase {
     }
   }
 
-  public void testWrongVersion() {
-    ChunkBuilder cb = new ChunkBuilder();
-    cb.addRecord("foo".getBytes());
-    cb.addRecord("bar".getBytes());
-    cb.addRecord("baz".getBytes());
-    Chunk c = cb.getChunk();
-    DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
-    try {
-      c.write(ob);
-      DataInputBuffer ib = new DataInputBuffer();
-      ib.reset(ob.getData(), c.getSerializedSizeEstimate());
-      // change current chunkImpl version
-      ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1;
-      ChunkImpl.read(ib);
-      fail("Should have raised an IOexception");
-    } catch (IOException e) {
-      // right behavior, do nothing
-    }
-  }
-  
   public void testTag() {
     ChunkBuilder cb = new ChunkBuilder();
     cb.addRecord("foo".getBytes());

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
index 93500ff..99d8dc1 100644
--- a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
+++ b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
@@ -172,7 +172,8 @@ public class TestFSMBuilder extends TestCase {
       conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
       conf.set("chukwaAgent.checkpoint.interval", "10000");
       int portno = conf.getInt("chukwaAgent.control.port", agentPort);
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent();
+      agent.start();
       conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
       conn.start();      
       sender = new ChukwaHttpSender(conf);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
index a039bc6..5b163c9 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
@@ -59,7 +59,8 @@ public class TestJMXAdaptor extends TestCase{
 	    conf.setInt("chukwaAgent.http.port", 9090);
 	    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);	    
 	    
-	    agent = new ChukwaAgent(conf);
+	    agent = ChukwaAgent.getAgent(conf);
+	    agent.start();
 	}
 	
 	public void testJMXAdaptor() {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
index 367484e..39f151b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
@@ -57,7 +57,8 @@ public class TestAddAdaptor extends TestCase {
     conf.setInt("chukwaAgent.http.port", 9090);
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
 
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
 
     assertEquals(0, agent.adaptorCount());
     System.out.println("adding jmx adaptor");

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
index 466053b..948ec5a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
@@ -63,7 +63,8 @@ public class TestBufferingWrappers extends TestCase {
   public void resendAfterStop(String adaptor)  throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     String ADAPTORID = "adaptor_test" + System.currentTimeMillis(); 
     String STR = "test data";
     int PORTNO = 9878;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
index 717125b..5748c9b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
@@ -56,7 +56,8 @@ public class TestDirTailingAdaptor extends TestCase {
     conf.setInt("chukwaAgent.control.port", 0);
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
     
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     File emptyDir = new File(baseDir, "emptyDir2");
     createEmptyDir(emptyDir);
     
@@ -90,7 +91,8 @@ public class TestDirTailingAdaptor extends TestCase {
     anOldFile.deleteOnExit();
     aNewFile.deleteOnExit();
     anOldFile.setLastModified(10);//just after epoch
-    agent = new ChukwaAgent(conf); //restart agent.
+    agent = ChukwaAgent.getAgent(conf); //restart agent.
+    agent.start();
     
    Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
    assertTrue(aNewFile.exists());
@@ -135,7 +137,8 @@ public class TestDirTailingAdaptor extends TestCase {
     while(retry) {
       try {
         retry = false;
-        agent = new ChukwaAgent(conf);
+        agent = ChukwaAgent.getAgent(conf);
+        agent.start();
       } catch(Exception e) {
         retry = true;
       }
@@ -167,11 +170,12 @@ public class TestDirTailingAdaptor extends TestCase {
     anOldFile.deleteOnExit();
     aNewFile.deleteOnExit();
     anOldFile.setLastModified(10);//just after epoch
-    agent = new ChukwaAgent(conf); //restart agent.
-    
+    agent = ChukwaAgent.getAgent(conf); //restart agent.
+    agent.start();
+
    Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
    assertTrue(aNewFile.exists());
-   
+
     //make sure we started tailing the new, not the old, file.
     for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
       System.out.println(adaptors.getKey() +": " + adaptors.getValue());
@@ -182,7 +186,7 @@ public class TestDirTailingAdaptor extends TestCase {
     Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
     assertEquals(4, agent.adaptorCount());
     agent.shutdown();
-    
+
     nukeDirContents(checkpointDir);//nuke dir
     checkpointDir.delete();
     emptyDir.delete();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
index 11a084a..1af52d0 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
@@ -35,7 +35,8 @@ public class TestExecAdaptor extends TestCase {
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
index eafa12d..fbe249a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
@@ -55,7 +55,8 @@ public class TestFileAdaptor extends TestCase {
   public void testOnce()  throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     
     assertEquals(0, agent.adaptorCount());
 
@@ -75,7 +76,8 @@ public class TestFileAdaptor extends TestCase {
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
 
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     for(int i=0; i < tests; ++i) {
       if(i % 100 == 0)
         System.out.println("buzzed " + i + " times");

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
index bcd940c..4bbf206 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
@@ -24,6 +24,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
 import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -34,8 +35,9 @@ import junit.framework.TestCase;
 public class TestHeartbeatAdaptor extends TestCase {
   private volatile boolean shutdown = false;
   private final int port = 4321;
-  public void testPingAdaptor() throws IOException, InterruptedException{
+  public void testPingAdaptor() throws IOException, InterruptedException, AlreadyRunningException{
     ChukwaAgent agent = ChukwaAgent.getAgent();
+    agent.start();
     Configuration conf = agent.getConfiguration();
     conf.set("chukwa.http.writer.host", "localhost");
     conf.set("chukwa.http.writer.port", String.valueOf(port));

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
index 1e0f234..65add51 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
@@ -44,7 +44,8 @@ public class TestCharFileTailingAdaptorUTF8 extends TestCase {
     
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
-    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     File testFile = makeTestFile("chukwaTest", 80,baseDir);
     String adaptorId = agent
         .processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
index 1c68a1a..fc04f25 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
@@ -36,7 +36,8 @@ public class TestFileExpirationPolicy extends TestCase {
     try {
       Configuration conf = new ChukwaConfiguration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
 
       FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
 
@@ -79,7 +80,8 @@ public class TestFileExpirationPolicy extends TestCase {
 
       Configuration conf = new ChukwaConfiguration();
       conf.set("chukwaAgent.control.port", "0");
-      agent = new ChukwaAgent(conf);
+      agent = ChukwaAgent.getAgent(conf);
+      agent.start();
       // Remove any adaptor left over from previous run
 
       ChukwaAgentController cli = new ChukwaAgentController("localhost", agent.getControllerPort());

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
index 570e7f4..d622b5d 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
@@ -49,7 +49,8 @@ public class TestFileTailer {
 		ChukwaConfiguration cc = new ChukwaConfiguration();
 		cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 18); // small in order to have hasMoreData=true
 																	 // (with 26 letters we should have 2 chunks)
-		agent = new ChukwaAgent(cc);
+		agent = ChukwaAgent.getAgent(cc);
+		agent.start();
 		
 		ChunkCatcherConnector chunks = new ChunkCatcherConnector();
 	    chunks.start();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
index 40479b5..2a82e79 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
@@ -50,7 +50,8 @@ public class TestFileTailingAdaptorBigRecord extends TestCase {
       ChukwaConfiguration cc = new ChukwaConfiguration();
       cc.set("chukwaAgent.control.port", "0");
       cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
-      ChukwaAgent agent = new ChukwaAgent(cc);
+      ChukwaAgent agent = ChukwaAgent.getAgent(cc);
+      agent.start();
       int portno = agent.getControllerPort();
       while (portno == -1) {
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
index fbbfd94..4590ef3 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
@@ -59,7 +59,8 @@ public class TestFileTailingAdaptorPreserveLines {
    */
   @Before
   public void setUp() throws Exception {
-    agent = new ChukwaAgent(conf);
+    agent = ChukwaAgent.getAgent(conf);
+    agent.start();
     chunks = new ChunkCatcherConnector();
     chunks.start();