You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/07/26 04:08:55 UTC
[2/8] chukwa git commit: CHUKWA-771. Improved code quality issue
identified by findbugs. (Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 8075f4d..f828ff1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -71,19 +72,20 @@ public class ChukwaHBaseStore {
static double RESOLUTION = 360;
static int MINUTE = 60000; //60 milliseconds
final static int SECOND = (int) TimeUnit.SECONDS.toMillis(1);
-
- static byte[] COLUMN_FAMILY = "t".getBytes();
- static byte[] ANNOTATION_FAMILY = "a".getBytes();
- static byte[] KEY_NAMES = "k".getBytes();
- static byte[] CHART_TYPE = "chart_meta".getBytes();
- static byte[] CHART_FAMILY = "c".getBytes();
- static byte[] COMMON_FAMILY = "c".getBytes();
- static byte[] WIDGET_TYPE = "widget_meta".getBytes();
- static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes();
+ private final static Charset UTF8 = Charset.forName("UTF-8");
+
+ final static byte[] COLUMN_FAMILY = "t".getBytes(UTF8);
+ final static byte[] ANNOTATION_FAMILY = "a".getBytes(UTF8);
+ final static byte[] KEY_NAMES = "k".getBytes(UTF8);
+ final static byte[] CHART_TYPE = "chart_meta".getBytes(UTF8);
+ final static byte[] CHART_FAMILY = "c".getBytes(UTF8);
+ final static byte[] COMMON_FAMILY = "c".getBytes(UTF8);
+ final static byte[] WIDGET_TYPE = "widget_meta".getBytes(UTF8);
+ final static byte[] DASHBOARD_TYPE = "dashboard_meta".getBytes(UTF8);
private static final String CHUKWA = "chukwa";
private static final String CHUKWA_META = "chukwa_meta";
private static long MILLISECONDS_IN_DAY = 86400000L;
- protected static Connection connection = null;
+ private static Connection connection = null;
public ChukwaHBaseStore() {
super();
@@ -171,7 +173,7 @@ public class ChukwaHBaseStore {
byte[] key = CellUtil.cloneQualifier(kv);
long timestamp = ByteBuffer.wrap(key).getLong();
double value = Double
- .parseDouble(new String(CellUtil.cloneValue(kv), "UTF-8"));
+ .parseDouble(new String(CellUtil.cloneValue(kv), UTF8));
series.add(timestamp, value);
}
}
@@ -179,7 +181,7 @@ public class ChukwaHBaseStore {
currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
}
table.close();
- } catch (Exception e) {
+ } catch (IOException e) {
closeHBase();
LOG.error(ExceptionUtil.getStackTrace(e));
}
@@ -191,12 +193,12 @@ public class ChukwaHBaseStore {
try {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
- Get get = new Get(metricGroup.getBytes());
+ Get get = new Get(metricGroup.getBytes(UTF8));
Result result = table.get(get);
for (Cell kv : result.rawCells()) {
- JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), "UTF-8"));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(kv), UTF8));
if (json.get("type").equals("metric")) {
- familyNames.add(new String(CellUtil.cloneQualifier(kv), "UTF-8"));
+ familyNames.add(new String(CellUtil.cloneQualifier(kv), UTF8));
}
}
table.close();
@@ -219,7 +221,7 @@ public class ChukwaHBaseStore {
Iterator<Result> it = rs.iterator();
while (it.hasNext()) {
Result result = it.next();
- metricGroups.add(new String(result.getRow(), "UTF-8"));
+ metricGroups.add(new String(result.getRow(), UTF8));
}
table.close();
} catch (Exception e) {
@@ -241,9 +243,9 @@ public class ChukwaHBaseStore {
while (it.hasNext()) {
Result result = it.next();
for (Cell cell : result.rawCells()) {
- JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
if (json!=null && json.get("type")!=null && json.get("type").equals("source")) {
- pk.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+ pk.add(new String(CellUtil.cloneQualifier(cell), UTF8));
}
}
}
@@ -296,7 +298,7 @@ public class ChukwaHBaseStore {
for(Cell cell : result.rawCells()) {
byte[] dest = new byte[5];
System.arraycopy(CellUtil.cloneRow(cell), 3, dest, 0, 5);
- String source = new String(dest);
+ String source = new String(dest, UTF8);
long time = cell.getTimestamp();
// Time display in x axis
long delta = time - startTime;
@@ -306,11 +308,11 @@ public class ChukwaHBaseStore {
if (keyMap.containsKey(source)) {
y = keyMap.get(source);
} else {
- keyMap.put(source, new Integer(index));
+ keyMap.put(source, Integer.valueOf(index));
y = index;
index++;
}
- double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
+ double v = Double.parseDouble(new String(CellUtil.cloneValue(cell), UTF8));
heatmap.put(x, y, v);
if (v > max) {
max = v;
@@ -355,9 +357,9 @@ public class ChukwaHBaseStore {
while (it.hasNext()) {
Result result = it.next();
for (Cell cell : result.rawCells()) {
- JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), "UTF-8"));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(CellUtil.cloneValue(cell), UTF8));
if (json.get("type").equals("cluster")) {
- clusters.add(new String(CellUtil.cloneQualifier(cell), "UTF-8"));
+ clusters.add(new String(CellUtil.cloneQualifier(cell), UTF8));
}
}
}
@@ -382,10 +384,10 @@ public class ChukwaHBaseStore {
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get get = new Get(CHART_TYPE);
Result r = table.get(get);
- byte[] value = r.getValue(CHART_FAMILY, id.getBytes());
+ byte[] value = r.getValue(CHART_FAMILY, id.getBytes(UTF8));
Gson gson = new Gson();
if(value!=null) {
- chart = gson.fromJson(new String(value), Chart.class);
+ chart = gson.fromJson(new String(value, UTF8), Chart.class);
}
table.close();
} catch (Exception e) {
@@ -408,7 +410,7 @@ public class ChukwaHBaseStore {
Put put = new Put(CHART_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(chart);
- put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+ put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
} catch (Exception e) {
@@ -437,7 +439,7 @@ public class ChukwaHBaseStore {
s.setLineOptions(l);
series.add(s);
}
- chart.SetSeries(series);
+ chart.setSeries(series);
return createChart(chart);
}
@@ -469,7 +471,7 @@ public class ChukwaHBaseStore {
Put put = new Put(CHART_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(chart);
- put.addColumn(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+ put.addColumn(CHART_FAMILY, id.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
} catch (Exception e) {
@@ -499,8 +501,8 @@ public class ChukwaHBaseStore {
}
// Figure out the time range and determine the best resolution
// to fetch the data
- long range = Math.round((endTime - startTime)
- / (MINUTES_IN_HOUR * MINUTE));
+ long range = (endTime - startTime)
+ / (long) (MINUTES_IN_HOUR * MINUTE);
long sampleRate = 1;
if (range <= 1) {
sampleRate = 5;
@@ -512,7 +514,7 @@ public class ChukwaHBaseStore {
sampleRate = 87600;
}
double smoothing = (endTime - startTime)
- / (sampleRate * SECOND ) / RESOLUTION;
+ / (double) (sampleRate * SECOND ) / (double) RESOLUTION;
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA));
@@ -550,7 +552,7 @@ public class ChukwaHBaseStore {
byte[] key = CellUtil.cloneQualifier(kv);
long timestamp = ByteBuffer.wrap(key).getLong();
double value = Double.parseDouble(new String(CellUtil.cloneValue(kv),
- "UTF-8"));
+ UTF8));
if(initial==0) {
filteredValue = value;
}
@@ -558,7 +560,7 @@ public class ChukwaHBaseStore {
lastTime = timestamp;
// Determine if there is any gap, if there is gap in data, reset
// calculation.
- if (elapsedTime > sampleRate) {
+ if (elapsedTime > (sampleRate * 5)) {
filteredValue = 0.0d;
} else {
if (smoothing != 0.0d) {
@@ -587,7 +589,7 @@ public class ChukwaHBaseStore {
list.add(clone);
}
table.close();
- } catch (Exception e) {
+ } catch (IOException|CloneNotSupportedException e) {
closeHBase();
LOG.error(ExceptionUtil.getStackTrace(e));
}
@@ -622,7 +624,7 @@ public class ChukwaHBaseStore {
continue;
}
Gson gson = new Gson();
- Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+ Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
list.add(widget);
c++;
}
@@ -658,7 +660,7 @@ public class ChukwaHBaseStore {
Result result = it.next();
for(Cell kv : result.rawCells()) {
Gson gson = new Gson();
- Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), "UTF-8"), Widget.class);
+ Widget widget = gson.fromJson(new String(CellUtil.cloneValue(kv), UTF8), Widget.class);
list.add(widget);
}
}
@@ -683,11 +685,11 @@ public class ChukwaHBaseStore {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get widget = new Get(WIDGET_TYPE);
- widget.addColumn(COMMON_FAMILY, title.getBytes());
+ widget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
Result rs = table.get(widget);
- byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes());
+ byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes(UTF8));
Gson gson = new Gson();
- w = gson.fromJson(new String(buffer), Widget.class);
+ w = gson.fromJson(new String(buffer, UTF8), Widget.class);
table.close();
} catch (Exception e) {
closeHBase();
@@ -708,7 +710,7 @@ public class ChukwaHBaseStore {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get widgetTest = new Get(WIDGET_TYPE);
- widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes());
+ widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8));
if (table.exists(widgetTest)) {
LOG.warn("Widget: " + widget.getTitle() + " already exists.");
created = false;
@@ -716,7 +718,7 @@ public class ChukwaHBaseStore {
Put put = new Put(WIDGET_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(widget);
- put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes());
+ put.addColumn(COMMON_FAMILY, widget.getTitle().getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
created = true;
}
@@ -741,12 +743,12 @@ public class ChukwaHBaseStore {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Delete oldWidget = new Delete(WIDGET_TYPE);
- oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+ oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
table.delete(oldWidget);
Put put = new Put(WIDGET_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(widget);
- put.addColumn(COMMON_FAMILY, title.getBytes(), buffer.getBytes());
+ put.addColumn(COMMON_FAMILY, title.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
result = true;
@@ -772,7 +774,7 @@ public class ChukwaHBaseStore {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Delete oldWidget = new Delete(WIDGET_TYPE);
- oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+ oldWidget.addColumn(COMMON_FAMILY, title.getBytes(UTF8));
table.delete(oldWidget);
table.close();
result = true;
@@ -790,7 +792,7 @@ public class ChukwaHBaseStore {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get dashboardTest = new Get(DASHBOARD_TYPE);
- dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes());
+ dashboardTest.addColumn(COMMON_FAMILY, "default".getBytes(UTF8));
exists = table.exists(dashboardTest);
table.close();
} catch (Exception e) {
@@ -931,19 +933,19 @@ public class ChukwaHBaseStore {
getHBaseConnection();
Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Get dashboard = new Get(DASHBOARD_TYPE);
- dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+ dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
Result rs = table.get(dashboard);
- byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes());
+ byte[] buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));
if(buffer == null) {
// If user dashboard is not found, use default dashboard.
key = new StringBuilder().append(id).append("|").toString();
dashboard = new Get(DASHBOARD_TYPE);
- dashboard.addColumn(COMMON_FAMILY, key.getBytes());
+ dashboard.addColumn(COMMON_FAMILY, key.getBytes(UTF8));
rs = table.get(dashboard);
- buffer = rs.getValue(COMMON_FAMILY, key.getBytes());
+ buffer = rs.getValue(COMMON_FAMILY, key.getBytes(UTF8));
}
Gson gson = new Gson();
- dash = gson.fromJson(new String(buffer), Dashboard.class);
+ dash = gson.fromJson(new String(buffer, UTF8), Dashboard.class);
table.close();
} catch (Exception e) {
closeHBase();
@@ -964,7 +966,7 @@ public class ChukwaHBaseStore {
Put put = new Put(DASHBOARD_TYPE);
Gson gson = new Gson();
String buffer = gson.toJson(dash);
- put.addColumn(COMMON_FAMILY, key.getBytes(), buffer.getBytes());
+ put.addColumn(COMMON_FAMILY, key.getBytes(UTF8), buffer.getBytes(UTF8));
table.put(put);
table.close();
result = true;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
index eb79cd7..4f5f289 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.chukwa.extraction.hbase;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -37,7 +38,7 @@ public abstract class AbstractProcessor {
protected String sourceHelper;
protected byte[] key = null;
- byte[] CF = "t".getBytes();
+ byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
boolean chunkInErrorSaved = false;
ArrayList<Put> output = null;
@@ -70,14 +71,14 @@ public abstract class AbstractProcessor {
byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
Put put = new Put(key);
byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add(CF, timeInBytes, time, value);
+ put.addColumn(CF, timeInBytes, time, value);
output.add(put);
reporter.putMetric(chunk.getDataType(), primaryKey);
reporter.putSource(chunk.getDataType(), source);
}
public void addRecord(String primaryKey, String value) {
- addRecord(primaryKey, value.getBytes());
+ addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
}
/**
@@ -96,7 +97,7 @@ public abstract class AbstractProcessor {
byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
Put put = new Put(key);
byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add(CF, timeInBytes, time, value);
+ put.addColumn(CF, timeInBytes, time, value);
output.add(put);
reporter.putMetric(chunk.getDataType(), primaryKey);
}
@@ -126,7 +127,7 @@ public abstract class AbstractProcessor {
Put put = new Put(key);
String family = "a";
byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes());
+ put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
output.add(put);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
index 2da64a3..483ac71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
@@ -17,34 +17,46 @@
*/
package org.apache.hadoop.chukwa.extraction.hbase;
+import java.lang.reflect.Type;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.chukwa.util.HBaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
public class DefaultProcessor extends AbstractProcessor {
-
+
public DefaultProcessor() throws NoSuchAlgorithmException {
- super();
- // TODO Auto-generated constructor stub
+ super();
+ // TODO Auto-generated constructor stub
}
-static Logger LOG = Logger.getLogger(DefaultProcessor.class);
+ static Logger LOG = Logger.getLogger(DefaultProcessor.class);
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource());
- Put put = new Put(key);
- byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
- put.add("t".getBytes(), timeInBytes, chunk.getData());
- output.add(put);
- JSONObject json = new JSONObject();
- json.put("sig", key);
- json.put("type", "unknown");
- reporter.put(chunk.getDataType(), chunk.getSource(), json.toString());
+ byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(),
+ chunk.getSource());
+ Put put = new Put(key);
+ byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+ put.addColumn("t".getBytes(Charset.forName("UTF-8")), timeInBytes,
+ chunk.getData());
+ output.add(put);
+ Type defaultType = new TypeToken<Map<String, String>>() {
+ }.getType();
+ Gson gson = new Gson();
+ Map<String, String> meta = new HashMap<String, String>();
+ meta.put("sig", new String(key, Charset.forName("UTF-8")));
+ meta.put("type", "unknown");
+ String buffer = gson.toJson(meta, defaultType);
+ reporter.put(chunk.getDataType(), chunk.getSource(), buffer);
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
index 19df607..de64a0d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.extraction.hbase;
-
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
-import org.apache.hadoop.chukwa.util.HBaseUtil;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -37,15 +37,14 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
static final String recordNameField = "recordName";
static final String hostName = "Hostname";
static final String processName = "ProcessName";
- static final byte[] cf = "t".getBytes();
+ static final byte[] cf = "t".getBytes(Charset.forName("UTF-8"));
public HadoopMetricsProcessor() throws NoSuchAlgorithmException {
}
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- try {
- String body = new String(recordEntry);
+ String body = new String(recordEntry, Charset.forName("UTF-8"));
int start = body.indexOf('{');
JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
@@ -56,10 +55,8 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
if(json.get(processName)!=null) {
src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
}
- @SuppressWarnings("unchecked")
- Iterator<String> ki = json.keySet().iterator();
- while (ki.hasNext()) {
- String keyName = ki.next();
+ for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+ String keyName = entry.getKey();
if (timestampField.intern() == keyName.intern()) {
continue;
} else if (contextNameField.intern() == keyName.intern()) {
@@ -71,20 +68,14 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
} else if (processName.intern() == keyName.intern()) {
continue;
} else {
- if (json.get(keyName) != null) {
- String v = json.get(keyName).toString();
+ if(json.get(keyName)!=null) {
+ String v = entry.getValue().toString();
String primaryKey = new StringBuilder(contextName).append(".")
.append(recordName).append(".").append(keyName).toString();
- addRecord(time, primaryKey, src, v.getBytes(), output);
+ addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
}
}
}
-
- } catch (Exception e) {
- LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
- e);
- throw e;
- }
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
index dcbe2d4..0682c71 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
@@ -22,43 +22,43 @@ import java.text.SimpleDateFormat;
import java.util.Date;
public class LogEntry {
- private final static SimpleDateFormat sdf = new SimpleDateFormat(
- "yyyy-MM-dd HH:mm");
+ private SimpleDateFormat sdf = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm");
- private Date date;
- private String logLevel;
- private String className;
- private String body;
+ private Date date;
+ private String logLevel;
+ private String className;
+ private String body;
- public LogEntry(String recordEntry) throws ParseException {
- String dStr = recordEntry.substring(0, 23);
- date = sdf.parse(dStr);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- logLevel = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- className = recordEntry.substring(start, idx - 1);
- body = recordEntry.substring(idx + 1);
- }
+ public LogEntry(String recordEntry) throws ParseException {
+ String dStr = recordEntry.substring(0, 23);
+ date = sdf.parse(dStr);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ logLevel = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ className = recordEntry.substring(start, idx - 1);
+ body = recordEntry.substring(idx + 1);
+ }
- public Date getDate() {
- return date;
- }
+ public Date getDate() {
+ return (Date) date.clone();
+ }
- public void setDate(Date date) {
- this.date = date;
- }
+ public void setDate(Date date) {
+ this.date = (Date) date.clone();
+ }
- public String getLogLevel() {
- return logLevel;
- }
+ public String getLogLevel() {
+ return logLevel;
+ }
- public String getClassName() {
- return className;
- }
+ public String getClassName() {
+ return className;
+ }
- public String getBody() {
- return body;
- }
+ public String getBody() {
+ return body;
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
index c2695f2..3718fbd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -22,20 +22,14 @@
*/
package org.apache.hadoop.chukwa.extraction.hbase;
+import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Iterator;
-import java.util.TimeZone;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
-import org.apache.hadoop.chukwa.Chunk;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -48,7 +42,7 @@ public class SystemMetrics extends AbstractProcessor {
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- String buffer = new String(recordEntry);
+ String buffer = new String(recordEntry, Charset.forName("UTF-8"));
JSONObject json = (JSONObject) JSONValue.parse(buffer);
time = ((Long) json.get("timestamp")).longValue();
ChukwaRecord record = new ChukwaRecord();
@@ -70,11 +64,9 @@ public class SystemMetrics extends AbstractProcessor {
user = user + Double.parseDouble(cpu.get("user").toString());
sys = sys + Double.parseDouble(cpu.get("sys").toString());
idle = idle + Double.parseDouble(cpu.get("idle").toString());
- @SuppressWarnings("unchecked")
- Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator();
- while(iterator.hasNext()) {
- String key = iterator.next();
- addRecord("cpu." + key + "." + i, cpu.get(key).toString());
+ for(Entry<String, Object> entry : (Set<Map.Entry>) cpu.entrySet()) {
+ String key = entry.getKey();
+ addRecord("cpu." + key + "." + i, String.valueOf(entry.getValue()));
}
}
combined = combined / actualSize;
@@ -94,20 +86,15 @@ public class SystemMetrics extends AbstractProcessor {
record = new ChukwaRecord();
JSONObject memory = (JSONObject) json.get("memory");
- @SuppressWarnings("unchecked")
- Iterator<String> memKeys = memory.keySet().iterator();
- while (memKeys.hasNext()) {
- String key = memKeys.next();
- addRecord("memory." + key, memory.get(key).toString());
+ for(Entry<String, Object> entry : (Set<Map.Entry>) memory.entrySet()) {
+ String key = entry.getKey();
+ addRecord("memory." + key, String.valueOf(entry.getValue()));
}
record = new ChukwaRecord();
JSONObject swap = (JSONObject) json.get("swap");
- @SuppressWarnings("unchecked")
- Iterator<String> swapKeys = swap.keySet().iterator();
- while (swapKeys.hasNext()) {
- String key = swapKeys.next();
- addRecord("swap." + key, swap.get(key).toString());
+ for(Map.Entry<String, Object> entry : (Set<Map.Entry>) swap.entrySet()) {
+ addRecord("swap." + entry.getKey(), String.valueOf(entry.getValue()));
}
double rxBytes = 0;
@@ -122,28 +109,30 @@ public class SystemMetrics extends AbstractProcessor {
JSONArray netList = (JSONArray) json.get("network");
for (int i = 0; i < netList.size(); i++) {
JSONObject netIf = (JSONObject) netList.get(i);
- @SuppressWarnings("unchecked")
- Iterator<String> keys = netIf.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, netIf.get(key).toString());
+ for(Map.Entry<String, Object> entry : (Set<Map.Entry>) netIf.entrySet()) {
+ String key = entry.getKey();
+ long value = 0;
+ if(entry.getValue() instanceof Long) {
+ value = (Long) entry.getValue();
+ }
+ record.add(key + "." + i, String.valueOf(entry.getValue()));
if (i != 0) {
if (key.equals("RxBytes")) {
- rxBytes = rxBytes + (Long) netIf.get(key);
+ rxBytes = rxBytes + value;
} else if (key.equals("RxDropped")) {
- rxDropped = rxDropped + (Long) netIf.get(key);
+ rxDropped = rxDropped + value;
} else if (key.equals("RxErrors")) {
- rxErrors = rxErrors + (Long) netIf.get(key);
+ rxErrors = rxErrors + value;
} else if (key.equals("RxPackets")) {
- rxPackets = rxPackets + (Long) netIf.get(key);
+ rxPackets = rxPackets + value;
} else if (key.equals("TxBytes")) {
- txBytes = txBytes + (Long) netIf.get(key);
+ txBytes = txBytes + value;
} else if (key.equals("TxCollisions")) {
- txCollisions = txCollisions + (Long) netIf.get(key);
+ txCollisions = txCollisions + value;
} else if (key.equals("TxErrors")) {
- txErrors = txErrors + (Long) netIf.get(key);
+ txErrors = txErrors + value;
} else if (key.equals("TxPackets")) {
- txPackets = txPackets + (Long) netIf.get(key);
+ txPackets = txPackets + value;
}
}
}
@@ -168,22 +157,25 @@ public class SystemMetrics extends AbstractProcessor {
JSONArray diskList = (JSONArray) json.get("disk");
for (int i = 0; i < diskList.size(); i++) {
JSONObject disk = (JSONObject) diskList.get(i);
- Iterator<String> keys = disk.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, disk.get(key).toString());
+ for(Entry<String, Object> entry : (Set<Map.Entry>) disk.entrySet()) {
+ String key = entry.getKey();
+ long value = 0;
+ if(entry.getValue() instanceof Long) {
+ value = (Long) entry.getValue();
+ }
+ record.add(key + "." + i, String.valueOf(entry.getValue()));
if (key.equals("ReadBytes")) {
- readBytes = readBytes + (Long) disk.get("ReadBytes");
+ readBytes = readBytes + value;
} else if (key.equals("Reads")) {
- reads = reads + (Long) disk.get("Reads");
+ reads = reads + Long.valueOf(value);;
} else if (key.equals("WriteBytes")) {
- writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+ writeBytes = writeBytes + value;
} else if (key.equals("Writes")) {
- writes = writes + (Long) disk.get("Writes");
+ writes = writes + value;
} else if (key.equals("Total")) {
- total = total + (Long) disk.get("Total");
+ total = total + value;
} else if (key.equals("Used")) {
- used = used + (Long) disk.get("Used");
+ used = used + value;
}
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
index 9c21bf1..02fe3b7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
@@ -20,21 +20,20 @@ package org.apache.hadoop.chukwa.hicc;
import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore;
public class ClusterConfig {
- private static Set<String> clusterMap = null;
+ private Set<String> clusterMap = null;
static public String getContents(File aFile) {
// ...checks on aFile are elided
StringBuffer contents = new StringBuffer();
try {
- // use buffering, reading one line at a time
- // FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
+ BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
try {
String line = null; // not declared within while loop
/*
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
index 2d84c09..fe90941 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/HiccWebServer.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -124,12 +125,12 @@ public class HiccWebServer {
StringBuilder sb = new StringBuilder();
String line = null;
try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
FSDataOutputStream out = fs.create(dest);
- out.write(sb.toString().getBytes());
+ out.write(sb.toString().getBytes(Charset.forName("UTF-8")));
out.close();
reader.close();
} catch(IOException e) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
index 7c8c6a7..19cde5f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Chart.java
@@ -111,7 +111,7 @@ public class Chart {
return this.id;
}
- public void SetSeries(List<SeriesMetaData> series) {
+ public void setSeries(List<SeriesMetaData> series) {
this.series = series;
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
index 61dc0b5..74d5e89 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/bean/Widget.java
@@ -89,11 +89,11 @@ public class Widget {
}
public String[] getTokens() {
- return tokens;
+ return tokens.clone();
}
public void setTokens(String[] tokens) {
- this.tokens = tokens;
+ this.tokens = tokens.clone();
}
public void tokenize() {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
index c9413bd..869efa4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/proxy/HttpProxy.java
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
+import java.nio.charset.Charset;
import java.util.Map;
import javax.servlet.ServletException;
@@ -44,11 +45,11 @@ public class HttpProxy extends HttpServlet {
private final String USER_AGENT = "Mozilla/5.0";
private final static String SOLR_URL = "chukwa.solr.url";
private final static Logger LOG = Logger.getLogger(HttpProxy.class);
- private ChukwaConfiguration conf = new ChukwaConfiguration();
private String solrUrl = null;
public HttpProxy() {
super();
+ ChukwaConfiguration conf = new ChukwaConfiguration();
solrUrl = conf.get(SOLR_URL);
}
@@ -72,7 +73,7 @@ public class HttpProxy extends HttpServlet {
LOG.info("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(
- con.getInputStream()));
+ con.getInputStream(), Charset.forName("UTF-8")));
String inputLine;
StringBuffer response1 = new StringBuffer();
@@ -80,7 +81,7 @@ public class HttpProxy extends HttpServlet {
while ((inputLine = in.readLine()) != null) {
response1.append(inputLine);
- sout.write(inputLine.getBytes());
+ sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
}
in.close();
@@ -131,7 +132,7 @@ public class HttpProxy extends HttpServlet {
LOG.debug("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(
- con.getInputStream()));
+ con.getInputStream(), Charset.forName("UTF-8")));
String inputLine;
StringBuffer response1 = new StringBuffer();
@@ -139,7 +140,7 @@ public class HttpProxy extends HttpServlet {
while ((inputLine = in.readLine()) != null) {
response1.append(inputLine);
- sout.write(inputLine.getBytes());
+ sout.write(inputLine.getBytes(Charset.forName("UTF-8")));
}
in.close();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
index ceed0df..1441253 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/SessionController.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.chukwa.hicc.rest;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
@@ -63,8 +65,8 @@ public class SessionController {
Gson gson = new Gson();
Type stringStringMap = new TypeToken<Map<String, String>>(){}.getType();
Map<String,String> map = gson.fromJson(buffer, stringStringMap);
- for(String key : map.keySet()) {
- request.getSession().setAttribute(key, map.get(key));
+ for(Entry<String, String> entry : (Set<Map.Entry<String, String>>) map.entrySet()) {
+ request.getSession().setAttribute(entry.getKey(), entry.getValue());
}
return Response.ok().build();
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
index ea07797..4524922 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/rest/VelocityResolver.java
@@ -39,7 +39,7 @@ public class VelocityResolver implements InjectableProvider<Context, Type> {
private VelocityEngine ve;
private static Logger LOG = Logger.getLogger(VelocityResolver.class);
- public static String LOGGER_NAME = VelocityResolver.class.getName();
+ public final static String LOGGER_NAME = VelocityResolver.class.getName();
/**
* Jersey configuration for setting up Velocity configuration.
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
deleted file mode 100644
index f0a3303..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.sql.SQLException;
-import java.util.Calendar;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Timer;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueInfoProcessor {
-
- private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
-
- private int intervalValue = 60;
- private String torqueServer = null;
- private String torqueBinDir = null;
- private String domain = null;
-
- private TreeMap<String, TreeMap<String, String>> currentHodJobs;
-
- public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
- this.intervalValue = interval;
-
- torqueServer = System.getProperty("TORQUE_SERVER");
- torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
- domain = System.getProperty("DOMAIN");
- currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
- }
-
- public void setup(boolean recover) throws Exception {
- }
-
- private void getHodJobInfo() throws IOException {
- StringBuffer sb = new StringBuffer();
- sb.append(torqueBinDir).append("/qstat -a");
-
- String[] getQueueInfoCommand = new String[3];
- getQueueInfoCommand[0] = "ssh";
- getQueueInfoCommand[1] = torqueServer;
- getQueueInfoCommand[2] = sb.toString();
-
- String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
- + " " + getQueueInfoCommand[2];
- ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
-
- Process p = pb.start();
-
- Timer timeout = new Timer();
- TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
- BufferedReader result = new BufferedReader(new InputStreamReader(p
- .getInputStream()));
- ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
- command, true);
- errorHandler.start();
-
- String line = null;
- boolean start = false;
- TreeSet<String> jobsInTorque = new TreeSet<String>();
- while ((line = result.readLine()) != null) {
- if (line.startsWith("---")) {
- start = true;
- continue;
- }
-
- if (start) {
- String[] items = line.split("\\s+");
- if (items.length >= 10) {
- String hodIdLong = items[0];
- String hodId = hodIdLong.split("[.]")[0];
- String userId = items[1];
- String numOfMachine = items[5];
- String status = items[9];
- jobsInTorque.add(hodId);
- if (!currentHodJobs.containsKey(hodId)) {
- TreeMap<String, String> aJobData = new TreeMap<String, String>();
-
- aJobData.put("userId", userId);
- aJobData.put("numOfMachine", numOfMachine);
- aJobData.put("traceCheckCount", "0");
- aJobData.put("process", "0");
- aJobData.put("status", status);
- currentHodJobs.put(hodId, aJobData);
- } else {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- aJobData.put("status", status);
- currentHodJobs.put(hodId, aJobData);
- }// if..else
- }
- }
- }// while
-
- try {
- errorHandler.join();
- } catch (InterruptedException ie) {
- log.error(ie.getMessage());
- }
- timeout.cancel();
-
- Set<String> currentHodJobIds = currentHodJobs.keySet();
- Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
- TreeSet<String> finishedHodIds = new TreeSet<String>();
- while (currentHodJobIdsIt.hasNext()) {
- String hodId = currentHodJobIdsIt.next();
- if (!jobsInTorque.contains(hodId)) {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String process = aJobData.get("process");
- if (process.equals("0") || process.equals("1")) {
- aJobData.put("status", "C");
- } else {
- finishedHodIds.add(hodId);
- }
- }
- }// while
-
- Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
- while (finishedHodIdsIt.hasNext()) {
- String hodId = finishedHodIdsIt.next();
- currentHodJobs.remove(hodId);
- }
-
- }
-
- private boolean loadQstatData(String hodId) throws IOException, SQLException {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String userId = aJobData.get("userId");
-
- StringBuffer sb = new StringBuffer();
- sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
- String[] qstatCommand = new String[3];
- qstatCommand[0] = "ssh";
- qstatCommand[1] = torqueServer;
- qstatCommand[2] = sb.toString();
-
- String command = qstatCommand[0] + " " + qstatCommand[1] + " "
- + qstatCommand[2];
- ProcessBuilder pb = new ProcessBuilder(qstatCommand);
- Process p = pb.start();
-
- Timer timeout = new Timer();
- TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
- BufferedReader result = new BufferedReader(new InputStreamReader(p
- .getInputStream()));
- ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
- command, false);
- errorHandler.start();
- String line = null;
- String hosts = null;
- long startTimeValue = -1;
- long endTimeValue = Calendar.getInstance().getTimeInMillis();
- long executeTimeValue = Calendar.getInstance().getTimeInMillis();
- boolean qstatfinished;
-
- while ((line = result.readLine()) != null) {
- if (line.indexOf("ctime") >= 0) {
- String startTime = line.split("=")[1].trim();
- // Tue Sep 9 23:44:29 2008
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date startTimeDate;
- try {
- startTimeDate = sdf.parse(startTime);
- startTimeValue = startTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("mtime") >= 0) {
- String endTime = line.split("=")[1].trim();
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date endTimeDate;
- try {
- endTimeDate = sdf.parse(endTime);
- endTimeValue = endTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("etime") >= 0) {
- String executeTime = line.split("=")[1].trim();
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date executeTimeDate;
- try {
- executeTimeDate = sdf.parse(executeTime);
- executeTimeValue = executeTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("exec_host") >= 0) {
- hosts = line.split("=")[1].trim();
- }
- }
-
- if (hosts != null && startTimeValue >= 0) {
- String[] items2 = hosts.split("[+]");
- int num = 0;
- for (int i = 0; i < items2.length; i++) {
- String machinetmp = items2[i];
- if (machinetmp.length() > 3) {
- String machine = items2[i].substring(0, items2[i].length() - 2);
- StringBuffer data = new StringBuffer();
- data.append("HodId=").append(hodId);
- data.append(", Machine=").append(machine);
- if (domain != null) {
- data.append(".").append(domain);
- }
- log.info(data);
- num++;
- }
- }
- Timestamp startTimedb = new Timestamp(startTimeValue);
- Timestamp endTimedb = new Timestamp(endTimeValue);
- StringBuffer data = new StringBuffer();
- long timeQueued = executeTimeValue - startTimeValue;
- data.append("HodID=").append(hodId);
- data.append(", UserId=").append(userId);
- data.append(", StartTime=").append(startTimedb);
- data.append(", TimeQueued=").append(timeQueued);
- data.append(", NumOfMachines=").append(num);
- data.append(", EndTime=").append(endTimedb);
- log.info(data);
- qstatfinished = true;
-
- } else {
-
- qstatfinished = false;
- }
-
- try {
- errorHandler.join();
- } catch (InterruptedException ie) {
- log.error(ie.getMessage());
- }
- result.close();
- timeout.cancel();
-
- return qstatfinished;
- }
-
- private boolean loadTraceJobData(String hodId) throws IOException,
- SQLException {
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String userId = aJobData.get("userId");
-
- StringBuffer sb = new StringBuffer();
- sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
- String[] traceJobCommand = new String[3];
- traceJobCommand[0] = "ssh";
- traceJobCommand[1] = torqueServer;
- traceJobCommand[2] = sb.toString();
-
- String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
- + traceJobCommand[2];
- ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
-
- Process p = pb.start();
-
- Timer timeout = new Timer();
- TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
-
- BufferedReader result = new BufferedReader(new InputStreamReader(p
- .getInputStream()));
- ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
- command, false);
- errorHandler.start();
- String line = null;
- String exit_status = null;
- String hosts = null;
- long timeQueued = -1;
- long startTimeValue = -1;
- long endTimeValue = -1;
- boolean findResult = false;
-
- while ((line = result.readLine()) != null && !findResult) {
- if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
- && line.indexOf("qtime") >= 0) {
- TreeMap<String, String> jobData = new TreeMap<String, String>();
- String[] items = line.split("\\s+");
- for (int i = 0; i < items.length; i++) {
- String[] items2 = items[i].split("=");
- if (items2.length >= 2) {
- jobData.put(items2[0], items2[1]);
- }
-
- }
- String startTime = jobData.get("ctime");
- startTimeValue = Long.valueOf(startTime);
- startTimeValue = startTimeValue - startTimeValue % (60);
- Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
-
- String queueTime = jobData.get("qtime");
- long queueTimeValue = Long.valueOf(queueTime);
-
- String sTime = jobData.get("start");
- long sTimeValue = Long.valueOf(sTime);
-
- timeQueued = sTimeValue - queueTimeValue;
-
- String endTime = jobData.get("end");
- endTimeValue = Long.valueOf(endTime);
- endTimeValue = endTimeValue - endTimeValue % (60);
- Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
-
- exit_status = jobData.get("Exit_status");
- hosts = jobData.get("exec_host");
- String[] items2 = hosts.split("[+]");
- int num = 0;
- for (int i = 0; i < items2.length; i++) {
- String machinetemp = items2[i];
- if (machinetemp.length() >= 3) {
- String machine = items2[i].substring(0, items2[i].length() - 2);
- StringBuffer data = new StringBuffer();
- data.append("HodId=").append(hodId);
- data.append(", Machine=").append(machine);
- if (domain != null) {
- data.append(".").append(domain);
- }
- log.info(data.toString());
- num++;
- }
- }
-
- StringBuffer data = new StringBuffer();
- data.append("HodID=").append(hodId);
- data.append(", UserId=").append(userId);
- data.append(", Status=").append(exit_status);
- data.append(", TimeQueued=").append(timeQueued);
- data.append(", StartTime=").append(startTimedb);
- data.append(", EndTime=").append(endTimedb);
- data.append(", NumOfMachines=").append(num);
- log.info(data.toString());
- findResult = true;
- log.debug(" hod info for job " + hodId + " has been loaded ");
- }// if
-
- }// while
-
- try {
- errorHandler.join();
- } catch (InterruptedException ie) {
- log.error(ie.getMessage());
- }
-
- timeout.cancel();
- boolean tracedone = false;
- if (!findResult) {
-
- String traceCheckCount = aJobData.get("traceCheckCount");
- int traceCheckCountValue = Integer.valueOf(traceCheckCount);
- traceCheckCountValue = traceCheckCountValue + 1;
- aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
-
- log.debug("did not find tracejob info for job " + hodId + ", after "
- + traceCheckCountValue + " times checking");
- if (traceCheckCountValue >= 2) {
- tracedone = true;
- }
- }
- boolean finished = findResult | tracedone;
- return finished;
- }
-
- private void process_data() throws SQLException {
-
- long currentTime = System.currentTimeMillis();
- currentTime = currentTime - currentTime % (60 * 1000);
-
- Set<String> hodIds = currentHodJobs.keySet();
-
- Iterator<String> hodIdsIt = hodIds.iterator();
- while (hodIdsIt.hasNext()) {
- String hodId = hodIdsIt.next();
- TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
- String status = aJobData.get("status");
- String process = aJobData.get("process");
- if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
- try {
- boolean result = loadQstatData(hodId);
- if (result) {
- aJobData.put("process", "1");
- currentHodJobs.put(hodId, aJobData);
- }
- } catch (IOException ioe) {
- log.error("load qsat data Error:" + ioe.getMessage());
-
- }
- }
- if (!process.equals("2") && status.equals("C")) {
- try {
- boolean result = loadTraceJobData(hodId);
-
- if (result) {
- aJobData.put("process", "2");
- currentHodJobs.put(hodId, aJobData);
- }
- } catch (IOException ioe) {
- log.error("loadTraceJobData Error:" + ioe.getMessage());
- }
- }// if
-
- } // while
-
- }
-
- private void handle_jobData() throws SQLException {
- try {
- getHodJobInfo();
- } catch (IOException ex) {
- log.error("getQueueInfo Error:" + ex.getMessage());
- return;
- }
- try {
- process_data();
- } catch (SQLException ex) {
- log.error("process_data Error:" + ex.getMessage());
- throw ex;
- }
- }
-
- public void run_forever() throws SQLException {
- while (true) {
- handle_jobData();
- try {
- log.debug("sleeping ...");
- Thread.sleep(this.intervalValue * 1000);
- } catch (InterruptedException e) {
- log.error(e.getMessage());
- }
- }
- }
-
- public void shutdown() {
- }
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
deleted file mode 100644
index 8ea645e..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.mdl;
-
-
-import java.util.TimerTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class TorqueTimerTask extends TimerTask {
- private Process ps = null;
- private String command;
-
- private static Log log = LogFactory.getLog(TorqueTimerTask.class);
- // public static int timeoutInterval=300;
- public static int timeoutInterval = 180;
-
- public TorqueTimerTask() {
- super();
- // TODO Auto-generated constructor stub
- }
-
- public TorqueTimerTask(Process process, String command) {
- super();
- this.ps = process;
- this.command = command;
-
- }
-
- public void run() {
- ps.destroy();
- log.error("torque command: " + command + " timed out");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
index 7ed3148..ff2a022 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.chukwa.util;
import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
public class ClusterConfig {
- public static final HashMap<String, String> clusterMap = new HashMap<String, String>();
+ private HashMap<String, String> clusterMap = new HashMap<String, String>();
private String path = System.getenv("CHUKWA_CONF_DIR") + File.separator;
static public String getContents(File aFile) {
@@ -31,9 +32,7 @@ public class ClusterConfig {
StringBuffer contents = new StringBuffer();
try {
- // use buffering, reading one line at a time
- // FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
+ BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
try {
String line = null; // not declared within while loop
/*
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
index c655e24..70a80c0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
+++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.chukwa.util;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Calendar;
@@ -29,7 +30,6 @@ import org.mortbay.log.Log;
public class HBaseUtil {
private static Logger LOG = Logger.getLogger(HBaseUtil.class);
- static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
static MessageDigest md5 = null;
static {
try {
@@ -50,8 +50,9 @@ public class HBaseUtil {
}
public static byte[] buildKey(long time, String primaryKey) {
+ Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
c.setTimeInMillis(time);
- byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+ byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
byte[] pk = getHash(primaryKey);
byte[] key = new byte[12];
System.arraycopy(day, 0, key, 0, day.length);
@@ -60,8 +61,9 @@ public class HBaseUtil {
}
public static byte[] buildKey(long time, String primaryKey, String source) {
+ Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
c.setTimeInMillis(time);
- byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+ byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(Charset.forName("UTF-8"));
byte[] pk = getHash(primaryKey);
byte[] src = getHash(source);
byte[] key = new byte[12];
@@ -73,7 +75,7 @@ public class HBaseUtil {
private static byte[] getHash(String key) {
byte[] hash = new byte[5];
- System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5);
+ System.arraycopy(md5.digest(key.getBytes(Charset.forName("UTF-8"))), 0, hash, 0, 5);
return hash;
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
index 26e4beb..c1cf37f 100644
--- a/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
+++ b/src/test/java/org/apache/hadoop/chukwa/ChunkImplTest.java
@@ -43,26 +43,6 @@ public class ChunkImplTest extends TestCase {
}
}
- public void testWrongVersion() {
- ChunkBuilder cb = new ChunkBuilder();
- cb.addRecord("foo".getBytes());
- cb.addRecord("bar".getBytes());
- cb.addRecord("baz".getBytes());
- Chunk c = cb.getChunk();
- DataOutputBuffer ob = new DataOutputBuffer(c.getSerializedSizeEstimate());
- try {
- c.write(ob);
- DataInputBuffer ib = new DataInputBuffer();
- ib.reset(ob.getData(), c.getSerializedSizeEstimate());
- // change current chunkImpl version
- ChunkImpl.PROTOCOL_VERSION = ChunkImpl.PROTOCOL_VERSION + 1;
- ChunkImpl.read(ib);
- fail("Should have raised an IOexception");
- } catch (IOException e) {
- // right behavior, do nothing
- }
- }
-
public void testTag() {
ChunkBuilder cb = new ChunkBuilder();
cb.addRecord("foo".getBytes());
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
index 93500ff..99d8dc1 100644
--- a/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
+++ b/src/test/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
@@ -172,7 +172,8 @@ public class TestFSMBuilder extends TestCase {
conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
conf.set("chukwaAgent.checkpoint.interval", "10000");
int portno = conf.getInt("chukwaAgent.control.port", agentPort);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent();
+ agent.start();
conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
conn.start();
sender = new ChukwaHttpSender(conf);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
index a039bc6..5b163c9 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
@@ -59,7 +59,8 @@ public class TestJMXAdaptor extends TestCase{
conf.setInt("chukwaAgent.http.port", 9090);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
}
public void testJMXAdaptor() {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
index 367484e..39f151b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
@@ -57,7 +57,8 @@ public class TestAddAdaptor extends TestCase {
conf.setInt("chukwaAgent.http.port", 9090);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.adaptorCount());
System.out.println("adding jmx adaptor");
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
index 466053b..948ec5a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
@@ -63,7 +63,8 @@ public class TestBufferingWrappers extends TestCase {
public void resendAfterStop(String adaptor) throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
String ADAPTORID = "adaptor_test" + System.currentTimeMillis();
String STR = "test data";
int PORTNO = 9878;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
index 717125b..5748c9b 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
@@ -56,7 +56,8 @@ public class TestDirTailingAdaptor extends TestCase {
conf.setInt("chukwaAgent.control.port", 0);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File emptyDir = new File(baseDir, "emptyDir2");
createEmptyDir(emptyDir);
@@ -90,7 +91,8 @@ public class TestDirTailingAdaptor extends TestCase {
anOldFile.deleteOnExit();
aNewFile.deleteOnExit();
anOldFile.setLastModified(10);//just after epoch
- agent = new ChukwaAgent(conf); //restart agent.
+ agent = ChukwaAgent.getAgent(conf); //restart agent.
+ agent.start();
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertTrue(aNewFile.exists());
@@ -135,7 +137,8 @@ public class TestDirTailingAdaptor extends TestCase {
while(retry) {
try {
retry = false;
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
} catch(Exception e) {
retry = true;
}
@@ -167,11 +170,12 @@ public class TestDirTailingAdaptor extends TestCase {
anOldFile.deleteOnExit();
aNewFile.deleteOnExit();
anOldFile.setLastModified(10);//just after epoch
- agent = new ChukwaAgent(conf); //restart agent.
-
+ agent = ChukwaAgent.getAgent(conf); //restart agent.
+ agent.start();
+
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertTrue(aNewFile.exists());
-
+
//make sure we started tailing the new, not the old, file.
for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
System.out.println(adaptors.getKey() +": " + adaptors.getValue());
@@ -182,7 +186,7 @@ public class TestDirTailingAdaptor extends TestCase {
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertEquals(4, agent.adaptorCount());
agent.shutdown();
-
+
nukeDirContents(checkpointDir);//nuke dir
checkpointDir.delete();
emptyDir.delete();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
index 11a084a..1af52d0 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
@@ -35,7 +35,8 @@ public class TestExecAdaptor extends TestCase {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
}
@Override
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
index eafa12d..fbe249a 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
@@ -55,7 +55,8 @@ public class TestFileAdaptor extends TestCase {
public void testOnce() throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
assertEquals(0, agent.adaptorCount());
@@ -75,7 +76,8 @@ public class TestFileAdaptor extends TestCase {
ChukwaAgent.AlreadyRunningException, InterruptedException {
int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
for(int i=0; i < tests; ++i) {
if(i % 100 == 0)
System.out.println("buzzed " + i + " times");
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
index bcd940c..4bbf206 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestHeartbeatAdaptor.java
@@ -24,6 +24,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
@@ -34,8 +35,9 @@ import junit.framework.TestCase;
public class TestHeartbeatAdaptor extends TestCase {
private volatile boolean shutdown = false;
private final int port = 4321;
- public void testPingAdaptor() throws IOException, InterruptedException{
+ public void testPingAdaptor() throws IOException, InterruptedException, AlreadyRunningException{
ChukwaAgent agent = ChukwaAgent.getAgent();
+ agent.start();
Configuration conf = agent.getConfiguration();
conf.set("chukwa.http.writer.host", "localhost");
conf.set("chukwa.http.writer.port", String.valueOf(port));
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
index 1e0f234..65add51 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
@@ -44,7 +44,8 @@ public class TestCharFileTailingAdaptorUTF8 extends TestCase {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
- ChukwaAgent agent = new ChukwaAgent(conf);
+ ChukwaAgent agent = ChukwaAgent.getAgent(conf);
+ agent.start();
File testFile = makeTestFile("chukwaTest", 80,baseDir);
String adaptorId = agent
.processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
index 1c68a1a..fc04f25 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
@@ -36,7 +36,8 @@ public class TestFileExpirationPolicy extends TestCase {
try {
Configuration conf = new ChukwaConfiguration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
@@ -79,7 +80,8 @@ public class TestFileExpirationPolicy extends TestCase {
Configuration conf = new ChukwaConfiguration();
conf.set("chukwaAgent.control.port", "0");
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
// Remove any adaptor left over from previous run
ChukwaAgentController cli = new ChukwaAgentController("localhost", agent.getControllerPort());
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
index 570e7f4..d622b5d 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
@@ -49,7 +49,8 @@ public class TestFileTailer {
ChukwaConfiguration cc = new ChukwaConfiguration();
cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 18); // small in order to have hasMoreData=true
// (with 26 letters we should have 2 chunks)
- agent = new ChukwaAgent(cc);
+ agent = ChukwaAgent.getAgent(cc);
+ agent.start();
ChunkCatcherConnector chunks = new ChunkCatcherConnector();
chunks.start();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
index 40479b5..2a82e79 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
@@ -50,7 +50,8 @@ public class TestFileTailingAdaptorBigRecord extends TestCase {
ChukwaConfiguration cc = new ChukwaConfiguration();
cc.set("chukwaAgent.control.port", "0");
cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
- ChukwaAgent agent = new ChukwaAgent(cc);
+ ChukwaAgent agent = ChukwaAgent.getAgent(cc);
+ agent.start();
int portno = agent.getControllerPort();
while (portno == -1) {
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
index fbbfd94..4590ef3 100644
--- a/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
@@ -59,7 +59,8 @@ public class TestFileTailingAdaptorPreserveLines {
*/
@Before
public void setUp() throws Exception {
- agent = new ChukwaAgent(conf);
+ agent = ChukwaAgent.getAgent(conf);
+ agent.start();
chunks = new ChunkCatcherConnector();
chunks.start();