You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/06/25 22:48:49 UTC
[18/22] chukwa git commit: CHUKWA-764. Clean up multiple flavor of
JSON usage. (Eric Yang)
CHUKWA-764. Clean up multiple flavor of JSON usage. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/c914d340
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/c914d340
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/c914d340
Branch: refs/heads/master
Commit: c914d3401026c8dbb8b93bba4da52841b704f0f5
Parents: 01a72e2
Author: Eric Yang <ey...@apache.org>
Authored: Mon Jun 22 17:40:40 2015 -0700
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Jun 22 17:40:40 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
pom.xml | 13 +-
.../agent/rest/AdaptorController.java | 3 +-
.../processor/mapper/DatanodeProcessor.java | 233 +++---
.../processor/mapper/HBaseMasterProcessor.java | 125 ++-
.../mapper/HBaseRegionServerProcessor.java | 82 +-
.../processor/mapper/JobConfProcessor.java | 2 +-
.../processor/mapper/JobTrackerProcessor.java | 241 +++---
.../mapper/Log4JMetricsContextProcessor.java | 17 +-
.../processor/mapper/NamenodeProcessor.java | 298 ++++----
.../processor/mapper/ZookeeperProcessor.java | 119 ++-
.../chukwa/extraction/hbase/SystemMetrics.java | 6 +-
.../apache/hadoop/chukwa/hicc/JSONLoader.java | 21 +-
.../org/apache/hadoop/chukwa/hicc/Views.java | 26 +-
.../apache/hadoop/chukwa/hicc/Workspace.java | 759 ++++++++++---------
src/site/apt/Quick_Start_Guide.apt | 34 +-
.../chukwa/database/TestDatabaseWebJson.java | 1 -
17 files changed, 995 insertions(+), 987 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fb27b2..2e46c2a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,8 @@ Trunk (unreleased changes)
BUGS
+ CHUKWA-764. Clean up multiple flavor of JSON usage. (Eric Yang)
+
CHUKWA-763. Removed unused old libraries. (Eric Yang)
CHUKWA-760. Added error message for missing configuration. (Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6ad2055..80b4013 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,9 +145,9 @@
<version>1.6.4</version>
</dependency>
<dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- <version>20090211</version>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
@@ -281,14 +281,9 @@
</exclusions>
</dependency>
<dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1</version>
- </dependency>
- <dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
- <version>4.7.2</version>
+ <version>4.9.0</version>
<type>jar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
index 7e2fa7c..70edc2a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
@@ -23,8 +23,7 @@ import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.commons.lang.StringEscapeUtils;
-import org.json.JSONObject;
-import org.json.JSONException;
+import org.json.simple.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
index b5cd941..4e5765d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
@@ -36,125 +36,122 @@ import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-@Tables(annotations={
-@Table(name="Datanode",columnFamily="dn"),
-@Table(name="Datanode",columnFamily="jvm"),
-@Table(name="Datanode",columnFamily="rpc")
-})
-public class DatanodeProcessor extends AbstractProcessor{
+@Tables(annotations = { @Table(name = "Datanode", columnFamily = "dn"),
+ @Table(name = "Datanode", columnFamily = "jvm"),
+ @Table(name = "Datanode", columnFamily = "rpc") })
+public class DatanodeProcessor extends AbstractProcessor {
- static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
-
- static {
- long zero = 0L;
- rateMap.put("blocks_verified", zero);
- rateMap.put("blocks_written", zero);
- rateMap.put("blocks_read", zero);
- rateMap.put("bytes_written", zero);
- rateMap.put("bytes_read", zero);
- rateMap.put("heartBeats_num_ops", zero);
- rateMap.put("SentBytes", zero);
- rateMap.put("ReceivedBytes", zero);
- rateMap.put("rpcAuthorizationSuccesses", zero);
- rateMap.put("rpcAuthorizationFailures", zero);
- rateMap.put("RpcQueueTime_num_ops", zero);
- rateMap.put("RpcProcessingTime_num_ops", zero);
- rateMap.put("gcCount", zero);
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
- Logger log = Logger.getLogger(DatanodeProcessor.class);
- long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
-
- final ChukwaRecord hdfs_datanode = new ChukwaRecord();
- final ChukwaRecord datanode_jvm = new ChukwaRecord();
- final ChukwaRecord datanode_rpc = new ChukwaRecord();
-
- Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
- private static final long serialVersionUID = 1L;
+ static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
- {
- put("blocks_verified", hdfs_datanode);
- put("blocks_written", hdfs_datanode);
- put("blocks_read", hdfs_datanode);
- put("blocks_replicated", hdfs_datanode);
- put("blocks_removed", hdfs_datanode);
- put("bytes_written", hdfs_datanode);
- put("bytes_read", hdfs_datanode);
- put("heartBeats_avg_time", hdfs_datanode);
- put("heartBeats_num_ops", hdfs_datanode);
-
- put("gcCount", datanode_jvm);
- put("gcTimeMillis", datanode_jvm);
- put("logError", datanode_jvm);
- put("logFatal", datanode_jvm);
- put("logInfo", datanode_jvm);
- put("logWarn", datanode_jvm);
- put("memHeapCommittedM", datanode_jvm);
- put("memHeapUsedM", datanode_jvm);
- put("threadsBlocked", datanode_jvm);
- put("threadsNew", datanode_jvm);
- put("threadsRunnable", datanode_jvm);
- put("threadsTerminated", datanode_jvm);
- put("threadsTimedWaiting", datanode_jvm);
- put("threadsWaiting", datanode_jvm);
+ static {
+ long zero = 0L;
+ rateMap.put("blocks_verified", zero);
+ rateMap.put("blocks_written", zero);
+ rateMap.put("blocks_read", zero);
+ rateMap.put("bytes_written", zero);
+ rateMap.put("bytes_read", zero);
+ rateMap.put("heartBeats_num_ops", zero);
+ rateMap.put("SentBytes", zero);
+ rateMap.put("ReceivedBytes", zero);
+ rateMap.put("rpcAuthorizationSuccesses", zero);
+ rateMap.put("rpcAuthorizationFailures", zero);
+ rateMap.put("RpcQueueTime_num_ops", zero);
+ rateMap.put("RpcProcessingTime_num_ops", zero);
+ rateMap.put("gcCount", zero);
+ }
- put("ReceivedBytes", datanode_rpc);
- put("RpcProcessingTime_avg_time", datanode_rpc);
- put("RpcProcessingTime_num_ops", datanode_rpc);
- put("RpcQueueTime_avg_time", datanode_rpc);
- put("RpcQueueTime_num_ops", datanode_rpc);
- put("SentBytes", datanode_rpc);
- put("rpcAuthorizationSuccesses", datanode_rpc);
- }
- };
- try{
- JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
- String ttTag = chunk.getTag("timeStamp");
- if(ttTag == null){
- log.warn("timeStamp tag not set in JMX adaptor for datanode");
- }
- else{
- timeStamp = Long.parseLong(ttTag);
- }
- Iterator<JSONObject> iter = obj.entrySet().iterator();
-
- while(iter.hasNext()){
- Map.Entry entry = (Map.Entry)iter.next();
- String key = (String) entry.getKey();
- Object value = entry.getValue();
- String valueString = value == null?"":value.toString();
-
- //Calculate rate for some of the metrics
- if(rateMap.containsKey(key)){
- long oldValue = rateMap.get(key);
- long curValue = Long.parseLong(valueString);
- rateMap.put(key, curValue);
- long newValue = curValue - oldValue;
- if(newValue < 0){
- log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "+key);
- newValue = 0L;
- }
- valueString = Long.toString(newValue);
- }
-
- if(metricsMap.containsKey(key)){
- ChukwaRecord rec = metricsMap.get(key);
- rec.add(key, valueString);
- }
- }
- buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
- output.collect(key, hdfs_datanode);
- buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
- output.collect(key, datanode_jvm);
- buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
- output.collect(key, datanode_rpc);
- }
- catch(Exception e){
- log.error(ExceptionUtil.getStackTrace(e));
- }
- }
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ Logger log = Logger.getLogger(DatanodeProcessor.class);
+ long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+
+ final ChukwaRecord hdfs_datanode = new ChukwaRecord();
+ final ChukwaRecord datanode_jvm = new ChukwaRecord();
+ final ChukwaRecord datanode_rpc = new ChukwaRecord();
+
+ Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
+ private static final long serialVersionUID = 1L;
+
+ {
+ put("blocks_verified", hdfs_datanode);
+ put("blocks_written", hdfs_datanode);
+ put("blocks_read", hdfs_datanode);
+ put("blocks_replicated", hdfs_datanode);
+ put("blocks_removed", hdfs_datanode);
+ put("bytes_written", hdfs_datanode);
+ put("bytes_read", hdfs_datanode);
+ put("heartBeats_avg_time", hdfs_datanode);
+ put("heartBeats_num_ops", hdfs_datanode);
+
+ put("gcCount", datanode_jvm);
+ put("gcTimeMillis", datanode_jvm);
+ put("logError", datanode_jvm);
+ put("logFatal", datanode_jvm);
+ put("logInfo", datanode_jvm);
+ put("logWarn", datanode_jvm);
+ put("memHeapCommittedM", datanode_jvm);
+ put("memHeapUsedM", datanode_jvm);
+ put("threadsBlocked", datanode_jvm);
+ put("threadsNew", datanode_jvm);
+ put("threadsRunnable", datanode_jvm);
+ put("threadsTerminated", datanode_jvm);
+ put("threadsTimedWaiting", datanode_jvm);
+ put("threadsWaiting", datanode_jvm);
+
+ put("ReceivedBytes", datanode_rpc);
+ put("RpcProcessingTime_avg_time", datanode_rpc);
+ put("RpcProcessingTime_num_ops", datanode_rpc);
+ put("RpcQueueTime_avg_time", datanode_rpc);
+ put("RpcQueueTime_num_ops", datanode_rpc);
+ put("SentBytes", datanode_rpc);
+ put("rpcAuthorizationSuccesses", datanode_rpc);
+ }
+ };
+ try {
+ JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+ String ttTag = chunk.getTag("timeStamp");
+ if (ttTag == null) {
+ log.warn("timeStamp tag not set in JMX adaptor for datanode");
+ } else {
+ timeStamp = Long.parseLong(ttTag);
+ }
+ Iterator<String> keys = obj.keySet().iterator();
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Object value = obj.get(key);
+ String valueString = value == null ? "" : value.toString();
+
+ // Calculate rate for some of the metrics
+ if (rateMap.containsKey(key)) {
+ long oldValue = rateMap.get(key);
+ long curValue = Long.parseLong(valueString);
+ rateMap.put(key, curValue);
+ long newValue = curValue - oldValue;
+ if (newValue < 0) {
+ log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "
+ + key);
+ newValue = 0L;
+ }
+ valueString = Long.toString(newValue);
+ }
+
+ if (metricsMap.containsKey(key)) {
+ ChukwaRecord rec = metricsMap.get(key);
+ rec.add(key, valueString);
+ }
+ }
+ buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
+ output.collect(key, hdfs_datanode);
+ buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
+ output.collect(key, datanode_jvm);
+ buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
+ output.collect(key, datanode_rpc);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
index 979103e..c04e752 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
@@ -38,69 +38,66 @@ import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-@Tables(annotations={
-@Table(name="HBase",columnFamily="master")
-})
-public class HBaseMasterProcessor extends AbstractProcessor{
- static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
- static {
- long zero = 0L;
- rateMap.put("splitSizeNumOps", zero);
- rateMap.put("splitTimeNumOps", zero);
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
-
- Logger log = Logger.getLogger(HBaseMasterProcessor.class);
- long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
- ChukwaRecord record = new ChukwaRecord();
-
- Map<String, Buffer> metricsMap = new HashMap<String,Buffer>();
+@Tables(annotations = { @Table(name = "HBase", columnFamily = "master") })
+public class HBaseMasterProcessor extends AbstractProcessor {
+ static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
+ static {
+ long zero = 0L;
+ rateMap.put("splitSizeNumOps", zero);
+ rateMap.put("splitTimeNumOps", zero);
+ }
- try{
- JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
- String ttTag = chunk.getTag("timeStamp");
- if(ttTag == null){
- log.warn("timeStamp tag not set in JMX adaptor for hbase master");
- }
- else{
- timeStamp = Long.parseLong(ttTag);
- }
- Iterator<JSONObject> iter = obj.entrySet().iterator();
-
- while(iter.hasNext()){
- Map.Entry entry = (Map.Entry)iter.next();
- String key = (String) entry.getKey();
- Object value = entry.getValue();
- String valueString = value == null?"":value.toString();
-
- //Calculate rate for some of the metrics
- if(rateMap.containsKey(key)){
- long oldValue = rateMap.get(key);
- long curValue = Long.parseLong(valueString);
- rateMap.put(key, curValue);
- long newValue = curValue - oldValue;
- if(newValue < 0){
- log.warn("HBaseMaster rateMap might be reset or corrupted for metric "+key);
- newValue = 0L;
- }
- valueString = Long.toString(newValue);
- }
-
- Buffer b = new Buffer(valueString.getBytes());
- metricsMap.put(key,b);
- }
-
- TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
- record.setMapFields(t);
- buildGenericRecord(record, null, timeStamp, "master");
- output.collect(key, record);
- }
- catch(Exception e){
- log.error(ExceptionUtil.getStackTrace(e));
- }
- }
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ Logger log = Logger.getLogger(HBaseMasterProcessor.class);
+ long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+ ChukwaRecord record = new ChukwaRecord();
+
+ Map<String, Buffer> metricsMap = new HashMap<String, Buffer>();
+
+ try {
+ JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+ String ttTag = chunk.getTag("timeStamp");
+ if (ttTag == null) {
+ log.warn("timeStamp tag not set in JMX adaptor for hbase master");
+ } else {
+ timeStamp = Long.parseLong(ttTag);
+ }
+ Iterator<String> keys = obj.keySet().iterator();
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Object value = obj.get(key);
+ String valueString = value == null ? "" : value.toString();
+
+ // Calculate rate for some of the metrics
+ if (rateMap.containsKey(key)) {
+ long oldValue = rateMap.get(key);
+ long curValue = Long.parseLong(valueString);
+ rateMap.put(key, curValue);
+ long newValue = curValue - oldValue;
+ if (newValue < 0) {
+ log.warn("HBaseMaster rateMap might be reset or corrupted for metric "
+ + key);
+ newValue = 0L;
+ }
+ valueString = Long.toString(newValue);
+ }
+
+ Buffer b = new Buffer(valueString.getBytes());
+ metricsMap.put(key, b);
+ }
+
+ TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
+ record.setMapFields(t);
+ buildGenericRecord(record, null, timeStamp, "master");
+ output.collect(key, record);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
index be28df5..8fab057 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
@@ -37,49 +37,45 @@ import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-@Tables(annotations={
-@Table(name="HBase",columnFamily="regionserver")
-})
-public class HBaseRegionServerProcessor extends AbstractProcessor{
+@Tables(annotations = { @Table(name = "HBase", columnFamily = "regionserver") })
+public class HBaseRegionServerProcessor extends AbstractProcessor {
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
-
- Logger log = Logger.getLogger(HBaseRegionServerProcessor.class);
- long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
- ChukwaRecord record = new ChukwaRecord();
-
- Map<String, Buffer> metricsMap = new HashMap<String,Buffer>();
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
- try{
- JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
- String ttTag = chunk.getTag("timeStamp");
- if(ttTag == null){
- log.warn("timeStamp tag not set in JMX adaptor for hbase region server");
- }
- else{
- timeStamp = Long.parseLong(ttTag);
- }
- Iterator<JSONObject> iter = obj.entrySet().iterator();
-
- while(iter.hasNext()){
- Map.Entry entry = (Map.Entry)iter.next();
- String key = (String) entry.getKey();
- Object value = entry.getValue();
- String valueString = value == null?"":value.toString();
- Buffer b = new Buffer(valueString.getBytes());
- metricsMap.put(key,b);
- }
-
- TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
- record.setMapFields(t);
- buildGenericRecord(record, null, timeStamp, "regionserver");
- output.collect(key, record);
- }
- catch(Exception e){
- log.error(ExceptionUtil.getStackTrace(e));
- }
- }
+ Logger log = Logger.getLogger(HBaseRegionServerProcessor.class);
+ long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+ ChukwaRecord record = new ChukwaRecord();
+
+ Map<String, Buffer> metricsMap = new HashMap<String, Buffer>();
+
+ try {
+ JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+ String ttTag = chunk.getTag("timeStamp");
+ if (ttTag == null) {
+ log.warn("timeStamp tag not set in JMX adaptor for hbase region server");
+ } else {
+ timeStamp = Long.parseLong(ttTag);
+ }
+ Iterator<String> keys = obj.keySet().iterator();
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Object value = obj.get(key);
+ String valueString = value == null ? "" : value.toString();
+ Buffer b = new Buffer(valueString.getBytes());
+ metricsMap.put(key, b);
+ }
+
+ TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
+ record.setMapFields(t);
+ buildGenericRecord(record, null, timeStamp, "regionserver");
+ output.collect(key, record);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
index 8a246e1..7e2e4e2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-import org.json.JSONObject;
+import org.json.simple.JSONObject;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
index 7c63a26..c2f7b52 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
@@ -36,130 +36,125 @@ import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
+@Tables(annotations = { @Table(name = "JobTracker", columnFamily = "jt"),
+ @Table(name = "JobTracker", columnFamily = "jvm"),
+ @Table(name = "JobTracker", columnFamily = "rpc") })
+public class JobTrackerProcessor extends AbstractProcessor {
+ static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
+ static {
+ long zero = 0L;
+ rateMap.put("SentBytes", zero);
+ rateMap.put("ReceivedBytes", zero);
+ rateMap.put("rpcAuthorizationSuccesses", zero);
+ rateMap.put("rpcAuthorizationFailures", zero);
+ rateMap.put("RpcQueueTime_num_ops", zero);
+ rateMap.put("RpcProcessingTime_num_ops", zero);
+ rateMap.put("heartbeats", zero);
+ rateMap.put("jobs_submitted", zero);
+ rateMap.put("jobs_completed", zero);
+ rateMap.put("jobs_failed", zero);
+ rateMap.put("jobs_killed", zero);
+ rateMap.put("maps_launched", zero);
+ rateMap.put("maps_completed", zero);
+ rateMap.put("maps_failed", zero);
+ rateMap.put("maps_killed", zero);
+ rateMap.put("reduces_launched", zero);
+ rateMap.put("reduces_completed", zero);
+ rateMap.put("reduces_failed", zero);
+ rateMap.put("reduces_killed", zero);
+ rateMap.put("gcCount", zero);
+ }
-@Tables(annotations={
-@Table(name="JobTracker",columnFamily="jt"),
-@Table(name="JobTracker",columnFamily="jvm"),
-@Table(name="JobTracker",columnFamily="rpc")
-})
-public class JobTrackerProcessor extends AbstractProcessor{
- static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
- static {
- long zero = 0L;
- rateMap.put("SentBytes", zero);
- rateMap.put("ReceivedBytes", zero);
- rateMap.put("rpcAuthorizationSuccesses", zero);
- rateMap.put("rpcAuthorizationFailures", zero);
- rateMap.put("RpcQueueTime_num_ops", zero);
- rateMap.put("RpcProcessingTime_num_ops", zero);
- rateMap.put("heartbeats", zero);
- rateMap.put("jobs_submitted", zero);
- rateMap.put("jobs_completed", zero);
- rateMap.put("jobs_failed", zero);
- rateMap.put("jobs_killed", zero);
- rateMap.put("maps_launched", zero);
- rateMap.put("maps_completed", zero);
- rateMap.put("maps_failed", zero);
- rateMap.put("maps_killed", zero);
- rateMap.put("reduces_launched", zero);
- rateMap.put("reduces_completed", zero);
- rateMap.put("reduces_failed", zero);
- rateMap.put("reduces_killed", zero);
- rateMap.put("gcCount", zero);
- }
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ Logger log = Logger.getLogger(JobTrackerProcessor.class);
+ long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
- Logger log = Logger.getLogger(JobTrackerProcessor.class);
- long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
-
- final ChukwaRecord mapred_jt = new ChukwaRecord();
- final ChukwaRecord jt_jvm = new ChukwaRecord();
- final ChukwaRecord jt_rpc = new ChukwaRecord();
-
- Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
- private static final long serialVersionUID = 1L;
- {
- put("gcCount", jt_jvm);
- put("gcTimeMillis", jt_jvm);
- put("logError", jt_jvm);
- put("logFatal", jt_jvm);
- put("logInfo", jt_jvm);
- put("logWarn", jt_jvm);
- put("memHeapCommittedM", jt_jvm);
- put("memHeapUsedM", jt_jvm);
- put("threadsBlocked", jt_jvm);
- put("threadsNew", jt_jvm);
- put("threadsRunnable", jt_jvm);
- put("threadsTerminated", jt_jvm);
- put("threadsTimedWaiting", jt_jvm);
- put("threadsWaiting", jt_jvm);
+ final ChukwaRecord mapred_jt = new ChukwaRecord();
+ final ChukwaRecord jt_jvm = new ChukwaRecord();
+ final ChukwaRecord jt_rpc = new ChukwaRecord();
- put("ReceivedBytes", jt_rpc);
- put("RpcProcessingTime_avg_time", jt_rpc);
- put("RpcProcessingTime_num_ops", jt_rpc);
- put("RpcQueueTime_avg_time", jt_rpc);
- put("RpcQueueTime_num_ops", jt_rpc);
- put("SentBytes", jt_rpc);
- put("rpcAuthorizationSuccesses", jt_rpc);
- put("rpcAuthorizationnFailures", jt_rpc);
- }
- };
- try{
- JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
- String ttTag = chunk.getTag("timeStamp");
- if(ttTag == null){
- log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
- }
- else{
- timeStamp = Long.parseLong(ttTag);
- }
- Iterator<JSONObject> iter = obj.entrySet().iterator();
-
- while(iter.hasNext()){
- Map.Entry entry = (Map.Entry)iter.next();
- String key = (String) entry.getKey();
- Object value = entry.getValue();
- String valueString = value == null?"":value.toString();
-
- //Calculate rate for some of the metrics
- if(rateMap.containsKey(key)){
- long oldValue = rateMap.get(key);
- long curValue = Long.parseLong(valueString);
- rateMap.put(key, curValue);
- long newValue = curValue - oldValue;
- if(newValue < 0){
- log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "+key);
- newValue = 0L;
- }
- valueString = Long.toString(newValue);
- }
-
- //These metrics are string types with JSON structure. So we parse them and get the count
- if(key.indexOf("Json") >= 0){
- //ignore these for now. Parsing of JSON array is throwing class cast exception.
- }
- else if(metricsMap.containsKey(key)){
- ChukwaRecord rec = metricsMap.get(key);
- rec.add(key, valueString);
- }
- else {
- mapred_jt.add(key, valueString);
- }
- }
-
- buildGenericRecord(mapred_jt, null, timeStamp, "jt");
- output.collect(key, mapred_jt);
- buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
- output.collect(key, jt_jvm);
- buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
- output.collect(key, jt_rpc);
- }
- catch(Exception e){
- log.error(ExceptionUtil.getStackTrace(e));
- }
- }
-}
+ Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
+ private static final long serialVersionUID = 1L;
+ {
+ put("gcCount", jt_jvm);
+ put("gcTimeMillis", jt_jvm);
+ put("logError", jt_jvm);
+ put("logFatal", jt_jvm);
+ put("logInfo", jt_jvm);
+ put("logWarn", jt_jvm);
+ put("memHeapCommittedM", jt_jvm);
+ put("memHeapUsedM", jt_jvm);
+ put("threadsBlocked", jt_jvm);
+ put("threadsNew", jt_jvm);
+ put("threadsRunnable", jt_jvm);
+ put("threadsTerminated", jt_jvm);
+ put("threadsTimedWaiting", jt_jvm);
+ put("threadsWaiting", jt_jvm);
+
+ put("ReceivedBytes", jt_rpc);
+ put("RpcProcessingTime_avg_time", jt_rpc);
+ put("RpcProcessingTime_num_ops", jt_rpc);
+ put("RpcQueueTime_avg_time", jt_rpc);
+ put("RpcQueueTime_num_ops", jt_rpc);
+ put("SentBytes", jt_rpc);
+ put("rpcAuthorizationSuccesses", jt_rpc);
+ put("rpcAuthorizationnFailures", jt_rpc);
+ }
+ };
+ try {
+ JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+ String ttTag = chunk.getTag("timeStamp");
+ if (ttTag == null) {
+ log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
+ } else {
+ timeStamp = Long.parseLong(ttTag);
+ }
+ Iterator<String> keys = obj.keySet().iterator();
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Object value = obj.get(key);
+ String valueString = value == null ? "" : value.toString();
+ // Calculate rate for some of the metrics
+ if (rateMap.containsKey(key)) {
+ long oldValue = rateMap.get(key);
+ long curValue = Long.parseLong(valueString);
+ rateMap.put(key, curValue);
+ long newValue = curValue - oldValue;
+ if (newValue < 0) {
+ log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "
+ + key);
+ newValue = 0L;
+ }
+ valueString = Long.toString(newValue);
+ }
+
+ // These metrics are string types with JSON structure. So we parse them
+ // and get the count
+ if (key.indexOf("Json") >= 0) {
+ // ignore these for now. Parsing of JSON array is throwing class cast
+ // exception.
+ } else if (metricsMap.containsKey(key)) {
+ ChukwaRecord rec = metricsMap.get(key);
+ rec.add(key, valueString);
+ } else {
+ mapred_jt.add(key, valueString);
+ }
+ }
+
+ buildGenericRecord(mapred_jt, null, timeStamp, "jt");
+ output.collect(key, mapred_jt);
+ buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
+ output.collect(key, jt_jvm);
+ buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
+ output.collect(key, jt_rpc);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
index 16b12f3..79291a1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
@@ -19,12 +19,15 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
import java.util.Iterator;
+
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-import org.json.JSONObject;
+
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
public class Log4JMetricsContextProcessor extends AbstractProcessor {
@@ -50,24 +53,24 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor {
@SuppressWarnings("unchecked")
public Log4JMetricsContextChukwaRecord(String recordEntry) throws Throwable {
LogEntry log = new LogEntry(recordEntry);
- JSONObject json = new JSONObject(log.getBody());
+ JSONObject json = (JSONObject) JSONValue.parse(log.getBody());
// round timestamp
- timestamp = json.getLong("timestamp");
+ timestamp = (Long) json.get("timestamp");
timestamp = (timestamp / 60000) * 60000;
// get record type
- String contextName = json.getString("contextName");
- String recordName = json.getString("recordName");
+ String contextName = (String) json.get("contextName");
+ String recordName = (String) json.get("recordName");
recordType = contextName;
if (!contextName.equals(recordName)) {
recordType += "_" + recordName;
}
- Iterator<String> ki = json.keys();
+ Iterator<String> ki = json.keySet().iterator();
while (ki.hasNext()) {
String key = ki.next();
- String value = json.getString(key);
+ String value = String.valueOf(json.get(key));
if(value != null) {
chukwaRecord.add(key, value);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
index 58b2df9..1e6e9d7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
@@ -36,155 +36,151 @@ import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-@Tables(annotations={
-@Table(name="Namenode",columnFamily="summary"),
-@Table(name="Namenode",columnFamily="hdfs"),
-@Table(name="Namenode",columnFamily="rpc"),
-@Table(name="Namenode",columnFamily="jvm")
-})
-public class NamenodeProcessor extends AbstractProcessor{
- static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
-
- static {
- long zero = 0L;
- rateMap.put("AddBlockOps", zero);
- rateMap.put("CreateFileOps", zero);
- rateMap.put("DeleteFileOps", zero);
- rateMap.put("FileInfoOps", zero);
- rateMap.put("FilesAppended", zero);
- rateMap.put("FilesCreated", zero);
- rateMap.put("FilesDeleted", zero);
- rateMap.put("FileInGetListingOps", zero);
- rateMap.put("FilesRenamed", zero);
- rateMap.put("GetBlockLocations", zero);
- rateMap.put("GetListingOps", zero);
- rateMap.put("SentBytes", zero);
- rateMap.put("ReceivedBytes", zero);
- rateMap.put("rpcAuthorizationSuccesses", zero);
- rateMap.put("rpcAuthorizationFailures", zero);
- rateMap.put("RpcQueueTime_num_ops", zero);
- rateMap.put("RpcProcessingTime_num_ops", zero);
- rateMap.put("gcCount", zero);
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
- try{
- Logger log = Logger.getLogger(NamenodeProcessor.class);
- long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
-
- final ChukwaRecord hdfs_overview = new ChukwaRecord();
- final ChukwaRecord hdfs_namenode = new ChukwaRecord();
- final ChukwaRecord namenode_jvm = new ChukwaRecord();
- final ChukwaRecord namenode_rpc = new ChukwaRecord();
-
- Map<String, ChukwaRecord> metricsMap = new HashMap<String,ChukwaRecord>(){
- private static final long serialVersionUID = 1L;
- {
- put("BlockCapacity", hdfs_overview);
- put("BlocksTotal", hdfs_overview);
- put("CapacityTotalGB", hdfs_overview);
- put("CapacityUsedGB", hdfs_overview);
- put("CapacityRemainingGB", hdfs_overview);
- put("CorruptBlocks", hdfs_overview);
- put("ExcessBlocks", hdfs_overview);
- put("FilesTotal", hdfs_overview);
- put("MissingBlocks", hdfs_overview);
- put("PendingDeletionBlocks", hdfs_overview);
- put("PendingReplicationBlocks", hdfs_overview);
- put("ScheduledReplicationBlocks", hdfs_overview);
- put("TotalLoad", hdfs_overview);
- put("UnderReplicatedBlocks", hdfs_overview);
-
- put("gcCount", namenode_jvm);
- put("gcTimeMillis", namenode_jvm);
- put("logError", namenode_jvm);
- put("logFatal", namenode_jvm);
- put("logInfo", namenode_jvm);
- put("logWarn", namenode_jvm);
- put("memHeapCommittedM", namenode_jvm);
- put("memHeapUsedM", namenode_jvm);
- put("threadsBlocked", namenode_jvm);
- put("threadsNew", namenode_jvm);
- put("threadsRunnable", namenode_jvm);
- put("threadsTerminated", namenode_jvm);
- put("threadsTimedWaiting", namenode_jvm);
- put("threadsWaiting", namenode_jvm);
-
- put("ReceivedBytes", namenode_rpc);
- put("RpcProcessingTime_avg_time", namenode_rpc);
- put("RpcProcessingTime_num_ops", namenode_rpc);
- put("RpcQueueTime_avg_time", namenode_rpc);
- put("RpcQueueTime_num_ops", namenode_rpc);
- put("SentBytes", namenode_rpc);
- put("rpcAuthorizationSuccesses", namenode_rpc);
- put("rpcAuthenticationFailures", namenode_rpc);
- put("rpcAuthenticationSuccesses", namenode_rpc);
- }
- };
-
-
- JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
- String ttTag = chunk.getTag("timeStamp");
- if(ttTag == null){
- log.warn("timeStamp tag not set in JMX adaptor for namenode");
- }
- else{
- timeStamp = Long.parseLong(ttTag);
- }
- Iterator<JSONObject> iter = obj.entrySet().iterator();
-
-
- while(iter.hasNext()){
- Map.Entry entry = (Map.Entry)iter.next();
- String key = (String) entry.getKey();
- Object value = entry.getValue();
- String valueString = (value == null)?"":value.toString();
-
- //These metrics are string types with JSON structure. So we parse them and get the count
- if(key.equals("LiveNodes") || key.equals("DeadNodes") || key.equals("DecomNodes") || key.equals("NameDirStatuses")){
- JSONObject jobj = (JSONObject) JSONValue.parse(valueString);
- valueString = Integer.toString(jobj.size());
- }
-
- //Calculate rate for some of the metrics
- if(rateMap.containsKey(key)){
- long oldValue = rateMap.get(key);
- long curValue = Long.parseLong(valueString);
- rateMap.put(key, curValue);
- long newValue = curValue - oldValue;
- if(newValue < 0){
- log.error("NamenodeProcessor's rateMap might be reset or corrupted for metric "+key);
- newValue = 0L;
- }
- valueString = Long.toString(newValue);
- }
-
- //Check if metric belongs to one of the categories in metricsMap. If not just write it in group Hadoop.HDFS.NameNode
- if(metricsMap.containsKey(key)){
- ChukwaRecord rec = metricsMap.get(key);
- rec.add(key, valueString);
- }
- else{
- hdfs_namenode.add(key, valueString);
- }
- }
- buildGenericRecord(hdfs_overview, null, timeStamp, "summary");
- output.collect(key, hdfs_overview);
- buildGenericRecord(hdfs_namenode, null, timeStamp, "hdfs");
- output.collect(key, hdfs_namenode);
- buildGenericRecord(namenode_jvm, null, timeStamp, "jvm");
- output.collect(key, namenode_jvm);
- buildGenericRecord(namenode_rpc, null, timeStamp, "rpc");
- output.collect(key, namenode_rpc);
- }
- catch(Exception e){
- log.error(ExceptionUtil.getStackTrace(e));
- }
-
- }
-
-}
+@Tables(annotations = { @Table(name = "Namenode", columnFamily = "summary"),
+ @Table(name = "Namenode", columnFamily = "hdfs"),
+ @Table(name = "Namenode", columnFamily = "rpc"),
+ @Table(name = "Namenode", columnFamily = "jvm") })
+public class NamenodeProcessor extends AbstractProcessor {
+ static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
+
+ static {
+ long zero = 0L;
+ rateMap.put("AddBlockOps", zero);
+ rateMap.put("CreateFileOps", zero);
+ rateMap.put("DeleteFileOps", zero);
+ rateMap.put("FileInfoOps", zero);
+ rateMap.put("FilesAppended", zero);
+ rateMap.put("FilesCreated", zero);
+ rateMap.put("FilesDeleted", zero);
+ rateMap.put("FileInGetListingOps", zero);
+ rateMap.put("FilesRenamed", zero);
+ rateMap.put("GetBlockLocations", zero);
+ rateMap.put("GetListingOps", zero);
+ rateMap.put("SentBytes", zero);
+ rateMap.put("ReceivedBytes", zero);
+ rateMap.put("rpcAuthorizationSuccesses", zero);
+ rateMap.put("rpcAuthorizationFailures", zero);
+ rateMap.put("RpcQueueTime_num_ops", zero);
+ rateMap.put("RpcProcessingTime_num_ops", zero);
+ rateMap.put("gcCount", zero);
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ try {
+ Logger log = Logger.getLogger(NamenodeProcessor.class);
+ long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+
+ final ChukwaRecord hdfs_overview = new ChukwaRecord();
+ final ChukwaRecord hdfs_namenode = new ChukwaRecord();
+ final ChukwaRecord namenode_jvm = new ChukwaRecord();
+ final ChukwaRecord namenode_rpc = new ChukwaRecord();
+
+ Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
+ private static final long serialVersionUID = 1L;
+ {
+ put("BlockCapacity", hdfs_overview);
+ put("BlocksTotal", hdfs_overview);
+ put("CapacityTotalGB", hdfs_overview);
+ put("CapacityUsedGB", hdfs_overview);
+ put("CapacityRemainingGB", hdfs_overview);
+ put("CorruptBlocks", hdfs_overview);
+ put("ExcessBlocks", hdfs_overview);
+ put("FilesTotal", hdfs_overview);
+ put("MissingBlocks", hdfs_overview);
+ put("PendingDeletionBlocks", hdfs_overview);
+ put("PendingReplicationBlocks", hdfs_overview);
+ put("ScheduledReplicationBlocks", hdfs_overview);
+ put("TotalLoad", hdfs_overview);
+ put("UnderReplicatedBlocks", hdfs_overview);
+
+ put("gcCount", namenode_jvm);
+ put("gcTimeMillis", namenode_jvm);
+ put("logError", namenode_jvm);
+ put("logFatal", namenode_jvm);
+ put("logInfo", namenode_jvm);
+ put("logWarn", namenode_jvm);
+ put("memHeapCommittedM", namenode_jvm);
+ put("memHeapUsedM", namenode_jvm);
+ put("threadsBlocked", namenode_jvm);
+ put("threadsNew", namenode_jvm);
+ put("threadsRunnable", namenode_jvm);
+ put("threadsTerminated", namenode_jvm);
+ put("threadsTimedWaiting", namenode_jvm);
+ put("threadsWaiting", namenode_jvm);
+
+ put("ReceivedBytes", namenode_rpc);
+ put("RpcProcessingTime_avg_time", namenode_rpc);
+ put("RpcProcessingTime_num_ops", namenode_rpc);
+ put("RpcQueueTime_avg_time", namenode_rpc);
+ put("RpcQueueTime_num_ops", namenode_rpc);
+ put("SentBytes", namenode_rpc);
+ put("rpcAuthorizationSuccesses", namenode_rpc);
+ put("rpcAuthenticationFailures", namenode_rpc);
+ put("rpcAuthenticationSuccesses", namenode_rpc);
+ }
+ };
+ JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+ String ttTag = chunk.getTag("timeStamp");
+ if (ttTag == null) {
+ log.warn("timeStamp tag not set in JMX adaptor for namenode");
+ } else {
+ timeStamp = Long.parseLong(ttTag);
+ }
+ Iterator<String> keys = obj.keySet().iterator();
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Object value = obj.get(key);
+ String valueString = (value == null) ? "" : value.toString();
+
+ // These metrics are string types with JSON structure. So we parse them
+ // and get the count
+ if (key.equals("LiveNodes") || key.equals("DeadNodes")
+ || key.equals("DecomNodes") || key.equals("NameDirStatuses")) {
+ JSONObject jobj = (JSONObject) JSONValue.parse(valueString);
+ valueString = Integer.toString(jobj.size());
+ }
+
+ // Calculate rate for some of the metrics
+ if (rateMap.containsKey(key)) {
+ long oldValue = rateMap.get(key);
+ long curValue = Long.parseLong(valueString);
+ rateMap.put(key, curValue);
+ long newValue = curValue - oldValue;
+ if (newValue < 0) {
+ log.error("NamenodeProcessor's rateMap might be reset or corrupted for metric "
+ + key);
+ newValue = 0L;
+ }
+ valueString = Long.toString(newValue);
+ }
+
+ // Check if metric belongs to one of the categories in metricsMap. If
+ // not just write it in group Hadoop.HDFS.NameNode
+ if (metricsMap.containsKey(key)) {
+ ChukwaRecord rec = metricsMap.get(key);
+ rec.add(key, valueString);
+ } else {
+ hdfs_namenode.add(key, valueString);
+ }
+ }
+ buildGenericRecord(hdfs_overview, null, timeStamp, "summary");
+ output.collect(key, hdfs_overview);
+ buildGenericRecord(hdfs_namenode, null, timeStamp, "hdfs");
+ output.collect(key, hdfs_namenode);
+ buildGenericRecord(namenode_jvm, null, timeStamp, "jvm");
+ output.collect(key, namenode_jvm);
+ buildGenericRecord(namenode_rpc, null, timeStamp, "rpc");
+ output.collect(key, namenode_rpc);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
index a1e3435..417fbb5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
@@ -36,67 +36,64 @@ import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-@Tables(annotations={
-@Table(name="Zookeeper",columnFamily="zk")
-})
-public class ZookeeperProcessor extends AbstractProcessor{
+@Tables(annotations = { @Table(name = "Zookeeper", columnFamily = "zk") })
+public class ZookeeperProcessor extends AbstractProcessor {
- static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
- static {
- long zero = 0L;
- rateMap.put("PacketsSent", zero);
- rateMap.put("PacketsReceived", zero);
- }
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
- Logger log = Logger.getLogger(ZookeeperProcessor.class);
- long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
- final ChukwaRecord record = new ChukwaRecord();
-
- Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
- private static final long serialVersionUID = 1L;
+ static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
+ static {
+ long zero = 0L;
+ rateMap.put("PacketsSent", zero);
+ rateMap.put("PacketsReceived", zero);
+ }
- {
- put("MinRequestLatency", record);
- put("AvgRequestLatency", record);
- put("MaxRequestLatency", record);
- put("PacketsReceived", record);
- put("PacketsSent", record);
- put("OutstandingRequests", record);
- put("NodeCount", record);
- put("WatchCount", record);
- }
- };
- try{
- JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
- String ttTag = chunk.getTag("timeStamp");
- if(ttTag == null){
- log.warn("timeStamp tag not set in JMX adaptor for zookeeper");
- }
- else{
- timeStamp = Long.parseLong(ttTag);
- }
- Iterator<JSONObject> iter = obj.entrySet().iterator();
-
- while(iter.hasNext()){
- Map.Entry entry = (Map.Entry)iter.next();
- String key = (String) entry.getKey();
- Object value = entry.getValue();
- String valueString = value == null?"":value.toString();
-
- if(metricsMap.containsKey(key)){
- ChukwaRecord rec = metricsMap.get(key);
- rec.add(key, valueString);
- }
- }
-
- buildGenericRecord(record, null, timeStamp, "zk");
- output.collect(key, record);
- }
- catch(Exception e){
- log.error(ExceptionUtil.getStackTrace(e));
- }
- }
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ Logger log = Logger.getLogger(ZookeeperProcessor.class);
+ long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+ final ChukwaRecord record = new ChukwaRecord();
+
+ Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
+ private static final long serialVersionUID = 1L;
+
+ {
+ put("MinRequestLatency", record);
+ put("AvgRequestLatency", record);
+ put("MaxRequestLatency", record);
+ put("PacketsReceived", record);
+ put("PacketsSent", record);
+ put("OutstandingRequests", record);
+ put("NodeCount", record);
+ put("WatchCount", record);
+ }
+ };
+ try {
+ JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+ String ttTag = chunk.getTag("timeStamp");
+ if (ttTag == null) {
+ log.warn("timeStamp tag not set in JMX adaptor for zookeeper");
+ } else {
+ timeStamp = Long.parseLong(ttTag);
+ }
+ Iterator<String> keys = ((JSONObject) obj).keySet().iterator();
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+ Object value = obj.get(key);
+ String valueString = value == null ? "" : value.toString();
+
+ if (metricsMap.containsKey(key)) {
+ ChukwaRecord rec = metricsMap.get(key);
+ rec.add(key, valueString);
+ }
+ }
+
+ buildGenericRecord(record, null, timeStamp, "zk");
+ output.collect(key, record);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
index a72e1bd..c2695f2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -70,9 +70,9 @@ public class SystemMetrics extends AbstractProcessor {
user = user + Double.parseDouble(cpu.get("user").toString());
sys = sys + Double.parseDouble(cpu.get("sys").toString());
idle = idle + Double.parseDouble(cpu.get("idle").toString());
- for (@SuppressWarnings("unchecked")
- Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); iterator
- .hasNext();) {
+ @SuppressWarnings("unchecked")
+ Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator();
+ while(iterator.hasNext()) {
String key = iterator.next();
addRecord("cpu." + key + "." + i, cpu.get(key).toString());
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
index 517a32d..f721978 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
@@ -20,8 +20,13 @@ package org.apache.hadoop.chukwa.hicc;
import java.net.*;
+import java.text.ParseException;
import java.io.*;
-import org.json.*;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
import org.apache.log4j.Logger;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -57,9 +62,9 @@ public class JSONLoader {
public JSONLoader(String source) {
String buffer = getContents(source);
try {
- JSONObject rows = new JSONObject(buffer);
- jsonData = new JSONArray(rows.get("rows").toString());
- } catch (JSONException e) {
+ JSONObject rows = (JSONObject) JSONValue.parse(buffer);
+ jsonData = (JSONArray) JSONValue.parse(rows.get("rows").toString());
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
}
@@ -68,7 +73,7 @@ public class JSONLoader {
String ts = null;
try {
ts = ((JSONObject) jsonData.get(i)).get("ts").toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return ts;
@@ -79,7 +84,7 @@ public class JSONLoader {
try {
tags = ((JSONObject) jsonData.get(i)).get("tags")
.toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return tags;
@@ -90,13 +95,13 @@ public class JSONLoader {
try {
value = ((JSONObject) jsonData.get(i)).get("value")
.toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return value;
}
public int length() {
- return jsonData.length();
+ return jsonData.size();
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
index 58859c5..dbb6707 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
@@ -21,7 +21,11 @@ package org.apache.hadoop.chukwa.hicc;
import java.io.*;
import java.util.*;
-import org.json.*;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -65,8 +69,8 @@ public class Views {
File aFile = new File(path);
String buffer = getContents(aFile);
try {
- viewsData = new JSONArray(buffer);
- } catch (JSONException e) {
+ viewsData = (JSONArray) JSONValue.parse(buffer);
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
}
@@ -76,7 +80,7 @@ public class Views {
try {
owner = ((JSONObject) viewsData.get(i)).get("owner")
.toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return owner;
@@ -86,8 +90,8 @@ public class Views {
Iterator permission = null;
try {
permission = ((JSONObject) ((JSONObject) viewsData.get(i))
- .get("permission")).keys();
- } catch (JSONException e) {
+ .get("permission")).keySet().iterator();
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return permission;
@@ -100,7 +104,7 @@ public class Views {
JSONObject permission = (JSONObject) view.get("permission");
JSONObject user = (JSONObject) permission.get(who);
read = user.get("read").toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return read;
@@ -110,7 +114,7 @@ public class Views {
String write = null;
try {
write = ((JSONObject) ((JSONObject) ((JSONObject) viewsData.get(i)).get("permission")).get(who)).get("write").toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return write;
@@ -121,7 +125,7 @@ public class Views {
try {
description = ((JSONObject) viewsData.get(i)).get(
"description").toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return description;
@@ -131,13 +135,13 @@ public class Views {
String key = null;
try {
key = ((JSONObject) viewsData.get(i)).get("key").toString();
- } catch (JSONException e) {
+ } catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
return key;
}
public int length() {
- return viewsData.length();
+ return viewsData.size();
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/c914d340/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java
index 29dab01..b6789b4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Workspace.java
@@ -1,376 +1,383 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.hicc;
-
-
-import java.io.*;
-import java.util.*;
-import javax.servlet.*;
-import javax.servlet.http.*;
-import java.sql.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.chukwa.util.XssFilter;
-import org.json.*;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
-
-public class Workspace extends HttpServlet {
- public static final long serialVersionUID = 101L;
- private static final Log log = LogFactory.getLog(Workspace.class);
- private String path = System.getenv("CHUKWA_DATA_DIR");
- transient private JSONObject hash = new JSONObject();
- transient private XssFilter xf;
-
- @Override
- protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
- }
-
- public void doGet(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- xf = new XssFilter(request);
- response.setContentType("text/plain");
- String method = xf.getParameter("method");
- if (method.equals("get_views_list")) {
- getViewsList(request, response);
- }
- if (method.equals("get_view")) {
- getView(request, response);
- }
- if (method.equals("save_view")) {
- saveView(request, response);
- }
- if (method.equals("change_view_info")) {
- changeViewInfo(request, response);
- }
- if (method.equals("get_widget_list")) {
- getWidgetList(request, response);
- }
- if (method.equals("clone_view")) {
- cloneView(request, response);
- }
- if (method.equals("delete_view")) {
- deleteView(request, response);
- }
- }
-
- public void doPost(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- doGet(request, response);
- }
-
- static public String getContents(File aFile) {
- // ...checks on aFile are elided
- StringBuffer contents = new StringBuffer();
-
- try {
- // use buffering, reading one line at a time
- // FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
- try {
- String line = null; // not declared within while loop
- /*
- * readLine is a bit quirky : it returns the content of a line MINUS the
- * newline. it returns null only for the END of the stream. it returns
- * an empty String if two newlines appear in a row.
- */
- while ((line = input.readLine()) != null) {
- contents.append(line);
- contents.append(System.getProperty("line.separator"));
- }
- } finally {
- input.close();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
-
- return contents.toString();
- }
-
- public void setContents(String fName, String buffer) {
- try {
- FileWriter fstream = new FileWriter(fName);
- BufferedWriter out = new BufferedWriter(fstream);
- out.write(buffer);
- out.close();
- } catch (Exception e) {
- System.err.println("Error: " + e.getMessage());
- }
- }
-
- public void cloneView(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- PrintWriter out = response.getWriter();
- String name = xf.getParameter("name");
- String template = xf.getParameter("clone_name");
- File aFile = new File(path + "/views/" + template);
- String config = getContents(aFile);
- int i = 0;
- boolean check = true;
- while (check) {
- String tmpName = name;
- if (i > 0) {
- tmpName = name + i;
- }
- File checkFile = new File(path + "/views/" + tmpName + ".view");
- check = checkFile.exists();
- if (!check) {
- name = tmpName;
- }
- i = i + 1;
- }
- setContents(path + "/views/" + name + ".view", config);
- File deleteCache = new File(path + "/views/workspace_view_list.cache");
- if(!deleteCache.delete()) {
- log.warn("Can not delete "+path + "/views/workspace_view_list.cache");
- }
- genViewCache(path + "/views");
- aFile = new File(path + "/views/workspace_view_list.cache");
- String viewsCache = getContents(aFile);
- out.println(viewsCache);
- }
-
- public void deleteView(HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
- String name = xf.getParameter("name");
- File aFile = new File(path + "/views/" + name + ".view");
- if(!aFile.delete()) {
- log.warn("Can not delete " + path + "/views/" + name + ".view");
- }
- File deleteCache = new File(path + "/views/workspace_view_list.cache");
- if(!deleteCache.delete()) {
- log.warn("Can not delete "+path + "/views/workspace_view_list.cache");
- }
- genViewCache(path + "/views");
- }
-
- public void getViewsList(HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
- PrintWriter out = response.getWriter();
- genViewCache(path + "/views");
- File aFile = new File(path + "/views/workspace_view_list.cache");
- String viewsCache = getContents(aFile);
- out.println(viewsCache);
- }
-
- public void getView(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- PrintWriter out = response.getWriter();
- String id = xf.getParameter("id");
- genViewCache(path + "/views");
- File aFile = new File(path + "/views/" + id + ".view");
- String view = getContents(aFile);
- out.println(view);
- }
-
- public void changeViewInfo(HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
- PrintWriter out = response.getWriter();
- String id = xf.getParameter("name");
- String config = request.getParameter("config");
- try {
- JSONObject jt = new JSONObject(config);
- File aFile = new File(path + "/views/" + id + ".view");
- String original = getContents(aFile);
- JSONObject updateObject = new JSONObject(original);
- updateObject.put("description", jt.get("description"));
- setContents(path + "/views/" + id + ".view", updateObject.toString());
- if (!rename(id, jt.get("description").toString())) {
- throw new Exception("Rename view file failed");
- }
- File deleteCache = new File(path + "/views/workspace_view_list.cache");
- if(!deleteCache.delete()) {
- log.warn("Can not delete "+path + "/views/workspace_view_list.cache");
- }
- genViewCache(path + "/views");
- out.println("Workspace is stored successfully.");
- } catch (Exception e) {
- out.println("Workspace store failed.");
- }
- }
-
- public void saveView(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- PrintWriter out = response.getWriter();
- String id = xf.getParameter("name");
- String config = request.getParameter("config");
- setContents(path + "/views/" + id + ".view", config);
- out.println("Workspace is stored successfully.");
- }
-
- public void getWidgetList(HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
- PrintWriter out = response.getWriter();
- genWidgetCache(path + "/descriptors");
- File aFile = new File(path + "/descriptors/workspace_plugin.cache");
- String viewsCache = getContents(aFile);
- out.println(viewsCache);
- }
-
- private void genViewCache(String source) {
- File cacheFile = new File(source + "/workspace_view_list.cache");
- if (!cacheFile.exists()) {
- File dir = new File(source);
- File[] filesWanted = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.endsWith(".view");
- }
- });
- JSONObject[] cacheGroup = new JSONObject[filesWanted.length];
- for (int i = 0; i < filesWanted.length; i++) {
- String buffer = getContents(filesWanted[i]);
- try {
- JSONObject jt = new JSONObject(buffer);
- String fn = filesWanted[i].getName();
- jt.put("key", fn.substring(0, (fn.length() - 5)));
- cacheGroup[i] = jt;
- } catch (Exception e) {
- log.debug(ExceptionUtil.getStackTrace(e));
- }
- }
- String viewList = convertObjectsToViewList(cacheGroup);
- setContents(source + "/workspace_view_list.cache", viewList);
- }
- }
-
- public String convertObjectsToViewList(JSONObject[] objArray) {
- JSONArray jsonArr = new JSONArray();
- JSONObject permission = new JSONObject();
- JSONObject user = new JSONObject();
- try {
- permission.put("read", 1);
- permission.put("modify", 1);
- user.put("all", permission);
- } catch (Exception e) {
- System.err.println("JSON Exception: " + e.getMessage());
- }
- for (int i = 0; i < objArray.length; i++) {
- try {
- JSONObject jsonObj = new JSONObject();
- jsonObj.put("key", objArray[i].get("key"));
- jsonObj.put("description", objArray[i].get("description"));
- jsonObj.put("owner", "");
- jsonObj.put("permission", user);
- jsonArr.put(jsonObj);
- } catch (Exception e) {
- System.err.println("JSON Exception: " + e.getMessage());
- }
- }
- return jsonArr.toString();
- }
-
- private void genWidgetCache(String source) {
- File cacheFile = new File(source + "/workspace_plugin.cache");
- File cacheDir = new File(source);
- if (!cacheFile.exists()
- || cacheFile.lastModified() < cacheDir.lastModified()) {
- File dir = new File(source);
- File[] filesWanted = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.endsWith(".descriptor");
- }
- });
- JSONObject[] cacheGroup = new JSONObject[filesWanted.length];
- for (int i = 0; i < filesWanted.length; i++) {
- String buffer = getContents(filesWanted[i]);
- try {
- JSONObject jt = new JSONObject(buffer);
- cacheGroup[i] = jt;
- } catch (Exception e) {
- log.debug(ExceptionUtil.getStackTrace(e));
- }
- }
- String widgetList = convertObjectsToWidgetList(cacheGroup);
- setContents(source + "/workspace_plugin.cache", widgetList);
- }
- }
-
- public String convertObjectsToWidgetList(JSONObject[] objArray) {
- JSONObject jsonObj = new JSONObject();
- JSONArray jsonArr = new JSONArray();
- for (int i = 0; i < objArray.length; i++) {
- jsonArr.put(objArray[i]);
- }
- try {
- jsonObj.put("detail", jsonArr);
- } catch (Exception e) {
- System.err.println("JSON Exception: " + e.getMessage());
- }
- for (int i = 0; i < objArray.length; i++) {
- try {
- String[] categoriesArray = objArray[i].get("categories").toString()
- .split(",");
- hash = addToHash(hash, categoriesArray, objArray[i]);
- } catch (JSONException e) {
- System.err.println("JSON Exception: " + e.getMessage());
- }
- }
- try {
- jsonObj.put("children", hash);
- } catch (Exception e) {
- System.err.println("JSON Exception: " + e.getMessage());
- }
- return jsonObj.toString();
- }
-
- public JSONObject addToHash(JSONObject hash, String[] categoriesArray,
- JSONObject obj) {
- JSONObject subHash = hash;
- for (int i = 0; i < categoriesArray.length; i++) {
- String id = categoriesArray[i];
- if (i >= categoriesArray.length - 1) {
- try {
- subHash.put("leaf:" + obj.get("title"), obj.get("id"));
- } catch (Exception e) {
- System.err.println("JSON Exception: " + e.getMessage());
- }
- } else {
- try {
- subHash = subHash.getJSONObject("node:" + id);
- } catch (JSONException e) {
- try {
- JSONObject tmpHash = new JSONObject();
- subHash.put("node:" + id, tmpHash);
- subHash = tmpHash;
- } catch (JSONException ex) {
- log.debug(ExceptionUtil.getStackTrace(e));
- }
- }
- }
- }
- return hash;
- }
-
- private boolean rename(String id, String desc) {
- try {
- File view = new File(path + "/views/" + id + ".view");
- File newFile = new File(path + File.separator + "views" + File.separator
- + desc + ".view");
- if(!view.renameTo(newFile)) {
- log.warn("Can not rename " + path + "/views/" + id + ".view to " +
- path + File.separator + "views" + File.separator + desc + ".view");
- }
- } catch (Exception e) {
- return false;
- }
- return true;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.hicc;
+
+
+import java.io.*;
+import java.util.*;
+
+import javax.servlet.*;
+import javax.servlet.http.*;
+
+import java.sql.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.util.XssFilter;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+
+public class Workspace extends HttpServlet {
+ public static final long serialVersionUID = 101L;
+ private static final Log log = LogFactory.getLog(Workspace.class);
+ private String path = System.getenv("CHUKWA_DATA_DIR");
+ transient private JSONObject hash = new JSONObject();
+ transient private XssFilter xf;
+
+ @Override
+ protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+ }
+
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ xf = new XssFilter(request);
+ response.setContentType("text/plain");
+ String method = xf.getParameter("method");
+ if (method.equals("get_views_list")) {
+ getViewsList(request, response);
+ }
+ if (method.equals("get_view")) {
+ getView(request, response);
+ }
+ if (method.equals("save_view")) {
+ saveView(request, response);
+ }
+ if (method.equals("change_view_info")) {
+ changeViewInfo(request, response);
+ }
+ if (method.equals("get_widget_list")) {
+ getWidgetList(request, response);
+ }
+ if (method.equals("clone_view")) {
+ cloneView(request, response);
+ }
+ if (method.equals("delete_view")) {
+ deleteView(request, response);
+ }
+ }
+
+ public void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ doGet(request, response);
+ }
+
+ static public String getContents(File aFile) {
+ // ...checks on aFile are elided
+ StringBuffer contents = new StringBuffer();
+
+ try {
+ // use buffering, reading one line at a time
+ // FileReader always assumes default encoding is OK!
+ BufferedReader input = new BufferedReader(new FileReader(aFile));
+ try {
+ String line = null; // not declared within while loop
+ /*
+ * readLine is a bit quirky : it returns the content of a line MINUS the
+ * newline. it returns null only for the END of the stream. it returns
+ * an empty String if two newlines appear in a row.
+ */
+ while ((line = input.readLine()) != null) {
+ contents.append(line);
+ contents.append(System.getProperty("line.separator"));
+ }
+ } finally {
+ input.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+
+ return contents.toString();
+ }
+
+ public void setContents(String fName, String buffer) {
+ try {
+ FileWriter fstream = new FileWriter(fName);
+ BufferedWriter out = new BufferedWriter(fstream);
+ out.write(buffer);
+ out.close();
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ }
+ }
+
+ public void cloneView(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ PrintWriter out = response.getWriter();
+ String name = xf.getParameter("name");
+ String template = xf.getParameter("clone_name");
+ File aFile = new File(path + "/views/" + template);
+ String config = getContents(aFile);
+ int i = 0;
+ boolean check = true;
+ while (check) {
+ String tmpName = name;
+ if (i > 0) {
+ tmpName = name + i;
+ }
+ File checkFile = new File(path + "/views/" + tmpName + ".view");
+ check = checkFile.exists();
+ if (!check) {
+ name = tmpName;
+ }
+ i = i + 1;
+ }
+ setContents(path + "/views/" + name + ".view", config);
+ File deleteCache = new File(path + "/views/workspace_view_list.cache");
+ if(!deleteCache.delete()) {
+ log.warn("Can not delete "+path + "/views/workspace_view_list.cache");
+ }
+ genViewCache(path + "/views");
+ aFile = new File(path + "/views/workspace_view_list.cache");
+ String viewsCache = getContents(aFile);
+ out.println(viewsCache);
+ }
+
+ public void deleteView(HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ String name = xf.getParameter("name");
+ File aFile = new File(path + "/views/" + name + ".view");
+ if(!aFile.delete()) {
+ log.warn("Can not delete " + path + "/views/" + name + ".view");
+ }
+ File deleteCache = new File(path + "/views/workspace_view_list.cache");
+ if(!deleteCache.delete()) {
+ log.warn("Can not delete "+path + "/views/workspace_view_list.cache");
+ }
+ genViewCache(path + "/views");
+ }
+
+ public void getViewsList(HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ PrintWriter out = response.getWriter();
+ genViewCache(path + "/views");
+ File aFile = new File(path + "/views/workspace_view_list.cache");
+ String viewsCache = getContents(aFile);
+ out.println(viewsCache);
+ }
+
+ public void getView(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ PrintWriter out = response.getWriter();
+ String id = xf.getParameter("id");
+ genViewCache(path + "/views");
+ File aFile = new File(path + "/views/" + id + ".view");
+ String view = getContents(aFile);
+ out.println(view);
+ }
+
+ public void changeViewInfo(HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ PrintWriter out = response.getWriter();
+ String id = xf.getParameter("name");
+ String config = request.getParameter("config");
+ try {
+ JSONObject jt = (JSONObject) JSONValue.parse(config);
+ File aFile = new File(path + "/views/" + id + ".view");
+ String original = getContents(aFile);
+ JSONObject updateObject = (JSONObject) JSONValue.parse(original);
+ updateObject.put("description", jt.get("description"));
+ setContents(path + "/views/" + id + ".view", updateObject.toString());
+ if (!rename(id, jt.get("description").toString())) {
+ throw new Exception("Rename view file failed");
+ }
+ File deleteCache = new File(path + "/views/workspace_view_list.cache");
+ if(!deleteCache.delete()) {
+ log.warn("Can not delete "+path + "/views/workspace_view_list.cache");
+ }
+ genViewCache(path + "/views");
+ out.println("Workspace is stored successfully.");
+ } catch (Exception e) {
+ out.println("Workspace store failed.");
+ }
+ }
+
+ public void saveView(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ PrintWriter out = response.getWriter();
+ String id = xf.getParameter("name");
+ String config = request.getParameter("config");
+ setContents(path + "/views/" + id + ".view", config);
+ out.println("Workspace is stored successfully.");
+ }
+
+ public void getWidgetList(HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ PrintWriter out = response.getWriter();
+ genWidgetCache(path + "/descriptors");
+ File aFile = new File(path + "/descriptors/workspace_plugin.cache");
+ String viewsCache = getContents(aFile);
+ out.println(viewsCache);
+ }
+
+ private void genViewCache(String source) {
+ File cacheFile = new File(source + "/workspace_view_list.cache");
+ if (!cacheFile.exists()) {
+ File dir = new File(source);
+ File[] filesWanted = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".view");
+ }
+ });
+ JSONObject[] cacheGroup = new JSONObject[filesWanted.length];
+ for (int i = 0; i < filesWanted.length; i++) {
+ String buffer = getContents(filesWanted[i]);
+ try {
+ JSONObject jt = (JSONObject) JSONValue.parse(buffer);
+ String fn = filesWanted[i].getName();
+ jt.put("key", fn.substring(0, (fn.length() - 5)));
+ cacheGroup[i] = jt;
+ } catch (Exception e) {
+ log.debug(ExceptionUtil.getStackTrace(e));
+ }
+ }
+ String viewList = convertObjectsToViewList(cacheGroup);
+ setContents(source + "/workspace_view_list.cache", viewList);
+ }
+ }
+
+ public String convertObjectsToViewList(JSONObject[] objArray) {
+ JSONArray jsonArr = new JSONArray();
+ JSONObject permission = new JSONObject();
+ JSONObject user = new JSONObject();
+ try {
+ permission.put("read", 1);
+ permission.put("modify", 1);
+ user.put("all", permission);
+ } catch (Exception e) {
+ System.err.println("JSON Exception: " + e.getMessage());
+ }
+ for (int i = 0; i < objArray.length; i++) {
+ try {
+ JSONObject jsonObj = new JSONObject();
+ jsonObj.put("key", objArray[i].get("key"));
+ jsonObj.put("description", objArray[i].get("description"));
+ jsonObj.put("owner", "");
+ jsonObj.put("permission", user);
+ jsonArr.add(jsonObj);
+ } catch (Exception e) {
+ System.err.println("JSON Exception: " + e.getMessage());
+ }
+ }
+ return jsonArr.toString();
+ }
+
+ private void genWidgetCache(String source) {
+ File cacheFile = new File(source + "/workspace_plugin.cache");
+ File cacheDir = new File(source);
+ if (!cacheFile.exists()
+ || cacheFile.lastModified() < cacheDir.lastModified()) {
+ File dir = new File(source);
+ File[] filesWanted = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".descriptor");
+ }
+ });
+ JSONObject[] cacheGroup = new JSONObject[filesWanted.length];
+ for (int i = 0; i < filesWanted.length; i++) {
+ String buffer = getContents(filesWanted[i]);
+ try {
+ JSONObject jt = (JSONObject) JSONValue.parse(buffer);
+ cacheGroup[i] = jt;
+ } catch (Exception e) {
+ log.debug(ExceptionUtil.getStackTrace(e));
+ }
+ }
+ String widgetList = convertObjectsToWidgetList(cacheGroup);
+ setContents(source + "/workspace_plugin.cache", widgetList);
+ }
+ }
+
+ public String convertObjectsToWidgetList(JSONObject[] objArray) {
+ JSONObject jsonObj = new JSONObject();
+ JSONArray jsonArr = new JSONArray();
+ for (int i = 0; i < objArray.length; i++) {
+ jsonArr.add(objArray[i]);
+ }
+ try {
+ jsonObj.put("detail", jsonArr);
+ } catch (Exception e) {
+ System.err.println("JSON Exception: " + e.getMessage());
+ }
+ for (int i = 0; i < objArray.length; i++) {
+ try {
+ String[] categoriesArray = objArray[i].get("categories").toString()
+ .split(",");
+ hash = addToHash(hash, categoriesArray, objArray[i]);
+ } catch (Exception e) {
+ System.err.println("JSON Exception: " + e.getMessage());
+ }
+ }
+ try {
+ jsonObj.put("children", hash);
+ } catch (Exception e) {
+ System.err.println("JSON Exception: " + e.getMessage());
+ }
+ return jsonObj.toString();
+ }
+
+ public JSONObject addToHash(JSONObject hash, String[] categoriesArray,
+ JSONObject obj) {
+ JSONObject subHash = hash;
+ for (int i = 0; i < categoriesArray.length; i++) {
+ String id = categoriesArray[i];
+ if (i >= categoriesArray.length - 1) {
+ try {
+ subHash.put("leaf:" + obj.get("title"), obj.get("id"));
+ } catch (Exception e) {
+ System.err.println("JSON Exception: " + e.getMessage());
+ }
+ } else {
+ try {
+ subHash = (JSONObject) subHash.get("node:" + id);
+ } catch (Exception e) {
+ try {
+ JSONObject tmpHash = new JSONObject();
+ subHash.put("node:" + id, tmpHash);
+ subHash = tmpHash;
+ } catch (Exception ex) {
+ log.debug(ExceptionUtil.getStackTrace(e));
+ }
+ }
+ }
+ }
+ return hash;
+ }
+
+ private boolean rename(String id, String desc) {
+ try {
+ File view = new File(path + "/views/" + id + ".view");
+ File newFile = new File(path + File.separator + "views" + File.separator
+ + desc + ".view");
+ if(!view.renameTo(newFile)) {
+ log.warn("Can not rename " + path + "/views/" + id + ".view to " +
+ path + File.separator + "views" + File.separator + desc + ".view");
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+}