You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/04/18 07:13:42 UTC
[2/4] chukwa git commit: CHUKWA-667. Optimize HBase metrics schema.
(Eric Yang)
CHUKWA-667. Optimize HBase metrics schema. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/7ae68398
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/7ae68398
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/7ae68398
Branch: refs/heads/master
Commit: 7ae68398626b33ac5fc06caf83573e992b766af0
Parents: fb022bf
Author: Eric Yang <ey...@apache.org>
Authored: Sun Apr 12 00:16:58 2015 -0700
Committer: Eric Yang <ey...@apache.org>
Committed: Sun Apr 12 00:16:58 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
conf/chukwa-demux-conf.xml | 11 +-
conf/hbase.schema | 64 +--
.../writer/hbase/HBaseWriter.java | 141 +----
.../writer/hbase/OutputCollector.java | 73 ---
.../datacollection/writer/hbase/Reporter.java | 105 ++--
.../chukwa/datastore/ChukwaHBaseStore.java | 510 +++++++++----------
.../demux/processor/mapper/DFInvalidRecord.java | 44 --
.../extraction/demux/processor/mapper/Df.java | 118 -----
.../mapper/HadoopMetricsProcessor.java | 17 +-
.../demux/processor/mapper/Iostat.java | 146 ------
.../demux/processor/mapper/JobSummary.java | 2 +-
.../mapper/Log4JMetricsContextProcessor.java | 2 +-
.../demux/processor/mapper/PbsInvalidEntry.java | 44 --
.../demux/processor/mapper/PbsNodes.java | 198 -------
.../extraction/demux/processor/mapper/Ps.java | 144 ------
.../extraction/demux/processor/mapper/Sar.java | 172 -------
.../extraction/demux/processor/mapper/Top.java | 171 -------
.../demux/processor/mapper/Torque.java | 93 ----
.../demux/processor/mapper/YWatch.java | 121 -----
.../processor/mapper/YwatchInvalidEntry.java | 43 --
.../chukwa/hicc/rest/HeatmapController.java | 61 ++-
.../chukwa/hicc/rest/MetricsController.java | 112 +---
.../inputtools/log4j/Log4JMetricsContext.java | 2 +-
src/main/web/hicc/jsp/graph_explorer.jsp | 28 +-
.../web/hicc/jsp/host_selector_dropdown.jsp | 2 +-
.../TestLog4JMetricsContextChukwaRecord.java | 16 +-
.../demux/processor/mapper/TestPsOutput.java | 53 --
28 files changed, 420 insertions(+), 2075 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 21b9f54..7acc0f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ CHUKWA-667. Optimize HBase metrics schema. (Eric Yang)
+
CHUKWA-741. Updated to Hadoop 2.6.0 and HBase 1.0.0. (Eric Yang)
CHUKWA-740. Updated README file to be more current. (Lewis John McGibbney via Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/conf/chukwa-demux-conf.xml
----------------------------------------------------------------------
diff --git a/conf/chukwa-demux-conf.xml b/conf/chukwa-demux-conf.xml
index 62aa04f..151bfa5 100644
--- a/conf/chukwa-demux-conf.xml
+++ b/conf/chukwa-demux-conf.xml
@@ -211,13 +211,13 @@
<property>
<name>ChukwaMetrics</name>
- <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaMetricsProcessor</value>
+ <value>org.apache.hadoop.chukwa.extraction.hbase.ChukwaMetricsProcessor</value>
<description></description>
</property>
<property>
<name>SystemMetrics</name>
- <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SystemMetrics</value>
+ <value>org.apache.hadoop.chukwa.extraction.hbase.SystemMetrics</value>
<description></description>
</property>
@@ -269,13 +269,6 @@
<description> Reducer class for Reduce Type MRJobReduceProcessor </description>
</property>
- <!-- Demux configs for both mapper and reducer -->
- <property>
- <name>SystemMetrics</name>
- <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SystemMetrics,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.SystemMetrics</value>
- <description> Reducer class for Reduce Type SystemMetrics </description>
- </property>
-
<property>
<name>ClientTrace</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ClientTraceProcessor,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ClientTrace</value>
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/conf/hbase.schema
----------------------------------------------------------------------
diff --git a/conf/hbase.schema b/conf/hbase.schema
index 7015725..5e32f90 100644
--- a/conf/hbase.schema
+++ b/conf/hbase.schema
@@ -1,61 +1,5 @@
-create "Hadoop",
-{NAME => "ClientTrace", VERSIONS => 65535},
-{NAME => "dfs_namenode", VERSIONS => 65535},
-{NAME => "dfs_FSNamesystem", VERSIONS => 65535},
-{NAME => "dfs_datanode", VERSIONS => 65535},
-{NAME => "mapred_jobtracker", VERSIONS => 65535},
-{NAME => "mapred_shuffleOutput", VERSIONS => 65535},
-{NAME => "mapred_tasktracker", VERSIONS => 65535},
-{NAME => "jvm_metrics", VERSIONS => 65535},
-{NAME => "mapred_Queue", VERSIONS => 65535},
-{NAME => "metricssystem_MetricsSystem", VERSIONS => 65535},
-{NAME => "rpc_rpc", VERSIONS => 65535},
-{NAME => "rpcdetailed_rpcdetailed", VERSIONS => 65535},
-{NAME => "ugi_ugi", VERSIONS => 65535}
-create "HadoopLog",
-{NAME => "NameNode", VERSIONS => 65535},
-{NAME => "Audit", VERSIONS => 65535}
-create "Jobs",
-{NAME => "summary" }
-create "SystemMetrics",
-{NAME => "cpu", VERSIONS => 65535},
-{NAME => "system", VERSIONS => 65535},
-{NAME => "disk", VERSIONS => 65535},
-{NAME => "memory", VERSIONS => 65535},
-{NAME => "swap", VERSIONS => 65535},
-{NAME => "network", VERSIONS => 65535},
-{NAME => "tags", VERSIONS => 65535}
-create "ClusterSummary",
-{NAME=> "cpu", VERSIONS => 65535},
-{NAME => "system", VERSIONS => 65535},
-{NAME => "disk", VERSIONS => 65535},
-{NAME => "memory", VERSIONS => 65535},
-{NAME => "network", VERSIONS => 65535},
-{NAME => "swap", VERSIONS => 65535},
-{NAME => "hdfs", VERSIONS => 65535},
-{NAME => "mapreduce", VERSIONS => 65535}
+create "chukwa_meta",
+{NAME=>"k"}
create "chukwa",
-{NAME=>"chukwaAgent_chunkQueue", VERSIONS => 65535},
-{NAME => "chukwaAgent_metrics", VERSIONS => 65535},
-{NAME => "chukwaAgent_httpSender", VERSIONS => 65535}
-create "HBase",
-{NAME => "master", VERSIONS => 65535},
-{NAME => "regionserver", VERSIONS => 65535}
-create "Namenode",
-{NAME => "summary", VERSIONS => 65535},
-{NAME => "hdfs", VERSIONS => 65535},
-{NAME => "rpc", VERSIONS => 65535},
-{NAME => "jvm", VERSIONS => 65535}
-create "Zookeeper",
-{NAME => "zk", VERSIONS => 65535}
-create "JobTracker",
-{NAME => "jt", VERSIONS => 65535},
-{NAME => "jvm", VERSIONS => 65535},
-{NAME => "rpc", VERSIONS => 65535}
-create "Datanode",
-{NAME => "dn", VERSIONS => 65535},
-{NAME => "jvm", VERSIONS => 65535},
-{NAME => "rpc", VERSIONS => 65535}
-
-
-
+{NAME=>"t"},
+{NAME=>"a"}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
index 8344318..34c82e1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
@@ -19,49 +19,42 @@
package org.apache.hadoop.chukwa.datacollection.writer.hbase;
import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
-import org.apache.hadoop.chukwa.extraction.demux.Demux;
-import org.apache.hadoop.chukwa.util.ClassUtils;
-import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor;
+import org.apache.hadoop.chukwa.extraction.hbase.ProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.hbase.UnknownRecordTypeException;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
public class HBaseWriter extends PipelineableWriter {
static Logger log = Logger.getLogger(HBaseWriter.class);
+ private static final String CHUKWA_TABLE = "chukwa";
+ private static final String CHUKWA_META_TABLE = "chukwa_meta";
boolean reportStats;
volatile long dataSize = 0;
final Timer statTimer;
- private OutputCollector output;
+ private ArrayList<Put> output;
private Reporter reporter;
private ChukwaConfiguration conf;
String defaultProcessor;
private HConnection connection;
- private Configuration hconf;
private class StatReportingTask extends TimerTask {
private long lastTs = System.currentTimeMillis();
@@ -98,20 +91,19 @@ public class HBaseWriter extends PipelineableWriter {
private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
this.reportStats = reportStats;
this.conf = conf;
- this.hconf = hconf;
this.statTimer = new Timer();
this.defaultProcessor = conf.get(
"chukwa.demux.mapper.default.processor",
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
- Demux.jobConf = conf;
log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
if (reportStats) {
statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
}
- output = new OutputCollector();
- reporter = new Reporter();
- if(conf.getBoolean("hbase.writer.verify.schema", false)) {
- verifyHbaseSchema();
+ output = new ArrayList<Put>();
+ try {
+ reporter = new Reporter();
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException("Can not register hashing algorithm.");
}
connection = HConnectionManager.createConnection(hconf);
}
@@ -125,83 +117,21 @@ public class HBaseWriter extends PipelineableWriter {
public void init(Configuration conf) throws WriterException {
}
- private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
- boolean status = false;
- try {
- if(admin.tableExists(table.name())) {
- HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
- HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
- for(HColumnDescriptor cd : columnDescriptors) {
- if(cd.getNameAsString().equals(table.columnFamily())) {
- log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
- status = true;
- }
- }
- } else {
- throw new Exception("HBase table: "+table.name()+ " does not exist.");
- }
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
- status = false;
- }
- return status;
- }
-
- private void verifyHbaseSchema() {
- log.debug("Verify Demux parser with HBase schema");
- boolean schemaVerified = true;
- try {
- HBaseAdmin admin = new HBaseAdmin(hconf);
- List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
- for(Class<?> x : demuxParsers) {
- if(x.isAnnotationPresent(Tables.class)) {
- Tables list = x.getAnnotation(Tables.class);
- for(Table table : list.annotations()) {
- if(!verifyHbaseTable(admin, table)) {
- schemaVerified = false;
- log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
- }
- }
- } else if(x.isAnnotationPresent(Table.class)) {
- Table table = x.getAnnotation(Table.class);
- if(!verifyHbaseTable(admin, table)) {
- schemaVerified = false;
- log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
- }
- }
- }
- } catch (Exception e) {
- schemaVerified = false;
- log.error(ExceptionUtil.getStackTrace(e));
- }
- if(!schemaVerified) {
- log.error("Hbase schema mismatch with demux parser.");
- if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
- log.error("Exiting...");
- DaemonWatcher.bailout(-1);
- }
- }
- }
-
@Override
public CommitStatus add(List<Chunk> chunks) throws WriterException {
CommitStatus rv = ChukwaWriter.COMMIT_OK;
try {
+ HTableInterface hbase = connection.getTable(CHUKWA_TABLE);
+ HTableInterface meta = connection.getTable(CHUKWA_META_TABLE);
for(Chunk chunk : chunks) {
synchronized (this) {
try {
- Table table = findHBaseTable(chunk.getDataType());
-
- if(table!=null) {
- HTableInterface hbase = connection.getTable(table.name());
- MapProcessor processor = getProcessor(chunk.getDataType());
- processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
- hbase.put(output.getKeyValues());
- } else {
- log.warn("Error finding HBase table for data type:"+chunk.getDataType());
- }
- } catch (Exception e) {
- log.warn(output.getKeyValues());
+ AbstractProcessor processor = getProcessor(chunk.getDataType());
+ processor.process(chunk, output, reporter);
+ hbase.put(output);
+ meta.put(reporter.getInfo());
+ } catch (Throwable e) {
+ log.warn(output);
log.warn(ExceptionUtil.getStackTrace(e));
}
dataSize += chunk.getData().length;
@@ -209,6 +139,7 @@ public class HBaseWriter extends PipelineableWriter {
reporter.clear();
}
}
+ hbase.close();
} catch (Exception e) {
log.error(ExceptionUtil.getStackTrace(e));
throw new WriterException("Failed to store data to HBase.");
@@ -219,31 +150,9 @@ public class HBaseWriter extends PipelineableWriter {
return rv;
}
- public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
- MapProcessor processor = getProcessor(dataType);
-
- Table table = null;
- if(processor.getClass().isAnnotationPresent(Table.class)) {
- return processor.getClass().getAnnotation(Table.class);
- } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
- Tables tables = processor.getClass().getAnnotation(Tables.class);
- for(Table t : tables.annotations()) {
- table = t;
- }
- }
-
- return table;
- }
-
- public String findHBaseColumnFamilyName(String dataType)
- throws UnknownRecordTypeException {
- Table table = findHBaseTable(dataType);
- return table.columnFamily();
- }
-
- private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
+ private AbstractProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
- return MapProcessorFactory.getProcessor(processorClass);
+ return ProcessorFactory.getProcessor(processorClass);
}
/**
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
deleted file mode 100644
index 352ca14..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.datacollection.writer.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.hbase.client.Put;
-
-public class OutputCollector implements
- org.apache.hadoop.mapred.OutputCollector<ChukwaRecordKey, ChukwaRecord> {
-
- private List<Put> buffers;
- private StringBuffer s = new StringBuffer();
- private byte[] rowKey = null;
- private byte[] cf = null;
- private long now = 0L;
-
- public OutputCollector() {
- buffers = new ArrayList<Put>();
- }
-
- @Override
- public void collect(ChukwaRecordKey key, ChukwaRecord value) throws IOException {
- String[] keyParts = key.getKey().split("/");
- s.setLength(0);
- s.append(keyParts[2]);
- s.append("-");
- s.append(keyParts[1]);
-
- rowKey = s.toString().getBytes();
-
- cf = key.getReduceType().getBytes();
- now = value.getTime();
-
- Put kv = new Put(rowKey);
- for(String field : value.getFields()) {
- kv.add(cf, field.getBytes(), now , value.getValue(field).getBytes());
- }
- buffers.add(kv);
- }
-
- public List<Put> getKeyValues() {
- return buffers;
- }
-
- public void clear() {
- s.setLength(0);
- rowKey = null;
- cf = null;
- buffers.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
index ab1ce82..8b1674d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
@@ -18,62 +18,91 @@
package org.apache.hadoop.chukwa.datacollection.writer.hbase;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Counters.Counter;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
-public class Reporter implements org.apache.hadoop.mapred.Reporter {
+import org.apache.hadoop.hbase.client.Put;
+import org.json.simple.JSONObject;
+import org.mortbay.log.Log;
- @Override
- public Counter getCounter(Enum<?> arg0) {
- // TODO Auto-generated method stub
- return null;
+public class Reporter {
+ private ArrayList<Put> meta = new ArrayList<Put>();
+ private MessageDigest md5 = null;
+
+ public Reporter() throws NoSuchAlgorithmException {
+ md5 = MessageDigest.getInstance("md5");
}
- @Override
- public Counter getCounter(String arg0, String arg1) {
- // TODO Auto-generated method stub
- return null;
+ public void putSource(String type, String source) {
+ byte[] value = getHash(source);
+ JSONObject json = new JSONObject();
+
+ try {
+ json.put("sig", new String(value, "UTF-8"));
+ json.put("type", "source");
+ } catch (UnsupportedEncodingException e) {
+ Log.warn("Error encoding metadata.");
+ Log.warn(e);
+ }
+ put(type.getBytes(), source.getBytes(), json.toString().getBytes());
+
}
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- // TODO Auto-generated method stub
- return null;
+ public void putMetric(String type, String metric) {
+ String buf = new StringBuilder(type).append(".").append(metric).toString();
+ byte[] pk = getHash(buf);
+
+ JSONObject json = new JSONObject();
+ try {
+ json.put("sig", new String(pk, "UTF-8"));
+ json.put("type", "metric");
+ } catch (UnsupportedEncodingException e) {
+ Log.warn("Error encoding metadata.");
+ Log.warn(e);
+ }
+ put(type.getBytes(), metric.getBytes(), json.toString().getBytes());
+
}
- @Override
- public void incrCounter(Enum<?> arg0, long arg1) {
- // TODO Auto-generated method stub
-
+ public void put(String key, String source, String info) {
+ put(key.getBytes(), source.getBytes(), info.getBytes());
}
- @Override
- public void incrCounter(String arg0, String arg1, long arg2) {
- // TODO Auto-generated method stub
-
+ public void put(byte[] key, byte[] source, byte[] info) {
+ Put put = new Put(key);
+ put.add("k".getBytes(), source, info);
+ meta.add(put);
}
- @Override
- public void setStatus(String arg0) {
- // TODO Auto-generated method stub
-
+ public void clear() {
+ meta.clear();
}
- @Override
- public void progress() {
- // TODO Auto-generated method stub
-
+ public List<Put> getInfo() {
+ return meta;
}
- @Override
- public float getProgress() {
- // TODO Auto-generated method stub
- return 0.0f;
+ private byte[] getHash(String key) {
+ byte[] hash = new byte[3];
+ System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 3);
+ return hash;
}
- public void clear() {
- // TODO Auto-generated method stub
-
+ public void putClusterName(String type, String clusterName) {
+ byte[] value = getHash(clusterName);
+ JSONObject json = new JSONObject();
+
+ try {
+ json.put("sig", new String(value, "UTF-8"));
+ json.put("type", "cluster");
+ } catch (UnsupportedEncodingException e) {
+ Log.warn("Error encoding metadata.");
+ Log.warn(e);
+ }
+ put(type.getBytes(), clusterName.getBytes(), json.toString().getBytes());
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 82a5e57..d9c32d6 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -18,15 +18,18 @@
package org.apache.hadoop.chukwa.datastore;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.TimeZone;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -35,330 +38,293 @@ import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint;
import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
import org.apache.hadoop.chukwa.hicc.bean.Series;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.HBaseUtil;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
public class ChukwaHBaseStore {
private static Configuration hconf = HBaseConfiguration.create();
- private static HConnection connection = null;
- private static final int POOL_SIZE = 60;
- static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
-
- public static Series getSeries(String tableName, String rkey, String family, String column,
- long startTime, long endTime, boolean filterByRowKey) {
- StringBuilder seriesName = new StringBuilder();
- seriesName.append(rkey);
- seriesName.append(":");
- seriesName.append(family);
- seriesName.append(":");
- seriesName.append(column);
+ static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class);
+ static byte[] COLUMN_FAMILY = "t".getBytes();
+ static byte[] ANNOTATION_FAMILY = "a".getBytes();
+ static byte[] KEY_NAMES = "k".getBytes();
+ private static final String CHUKWA = "chukwa";
+ private static final String CHUKWA_META = "chukwa_meta";
+ private static long MILLISECONDS_IN_DAY = 86400000L;
- Series series = new Series(seriesName.toString());
+ /**
+ * Scan chukwa table for a particular metric group and metric name based on
+ * time ranges.
+ *
+ * @param metricGroup
+ * @param metric
+ * @param source
+ * @param startTime
+ * @param endTime
+ * @return
+ */
+ public static Series getSeries(String metricGroup, String metric,
+ String source, long startTime, long endTime) {
+ String fullMetricName = new StringBuilder(metricGroup).append(".")
+ .append(metric).toString();
+ return getSeries(fullMetricName, source, startTime, endTime);
+ }
+
+ /**
+ * Scan chukwa table for a full metric name based on time ranges.
+ *
+ * @param metric
+ * @param source
+ * @param startTime
+ * @param endTime
+ * @return
+ */
+ public static Series getSeries(String metric, String source, long startTime,
+ long endTime) {
+ String seriesName = new StringBuilder(metric).append(":").append(source).toString();
+ Series series = new Series(seriesName);
try {
- HTableInterface table = getHTableConnection().getTable(tableName);
- Calendar c = Calendar.getInstance();
- c.setTimeInMillis(startTime);
- c.set(Calendar.MINUTE, 0);
- c.set(Calendar.SECOND, 0);
- c.set(Calendar.MILLISECOND, 0);
- String startRow = c.getTimeInMillis()+rkey;
- Scan scan = new Scan();
- scan.addColumn(family.getBytes(), column.getBytes());
- scan.setStartRow(startRow.getBytes());
- scan.setTimeRange(startTime, endTime);
- scan.setMaxVersions();
- if(filterByRowKey) {
- RowFilter rf = new RowFilter(CompareOp.EQUAL, new
- RegexStringComparator("[0-9]+-"+rkey+"$"));
- scan.setFilter(rf);
+ // Swap start and end if the values are inverted.
+ if (startTime > endTime) {
+ long temp = endTime;
+ startTime = endTime;
+ endTime = temp;
}
- ResultScanner results = table.getScanner(scan);
- Iterator<Result> it = results.iterator();
- // TODO: Apply discrete wavelet transformation to limit the output
- // size to 1000 data points for graphing optimization. (i.e jwave)
- while(it.hasNext()) {
- Result result = it.next();
- String temp = new String(result.getValue(family.getBytes(), column.getBytes()));
- double value = Double.parseDouble(temp);
- // TODO: Pig Store function does not honor HBase timestamp, hence need to parse rowKey for timestamp.
- String buf = new String(result.getRow());
- Long timestamp = Long.parseLong(buf.split("-")[0]);
- // If Pig Store function can honor HBase timestamp, use the following line is better.
- // series.add(result.getCellValue().getTimestamp(), value);
- series.add(timestamp, value);
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA));
+ Scan scan = new Scan();
+ Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ c.setTimeInMillis(startTime);
+ int startDay = c.get(Calendar.DAY_OF_YEAR);
+ c.setTimeInMillis(endTime);
+ int endDay = c.get(Calendar.DAY_OF_YEAR);
+ long currentDay = startTime;
+ for (int i = startDay; i <= endDay; i++) {
+ byte[] rowKey = HBaseUtil.buildKey(currentDay, metric, source);
+ // ColumnRangeFilter crf = new
+ // ColumnRangeFilter(Long.valueOf(startTime).toString().getBytes(),
+ // true, Long.valueOf(endTime).toString().getBytes(), true);
+ // scan.setFilter(crf);
+ scan.addFamily(COLUMN_FAMILY);
+ scan.setStartRow(rowKey);
+ scan.setStopRow(rowKey);
+ scan.setTimeRange(startTime, endTime);
+ scan.setBatch(10000);
+
+ ResultScanner results = table.getScanner(scan);
+ Iterator<Result> it = results.iterator();
+ // TODO: Apply discrete wavelet transformation to limit the output
+ // size to 1000 data points for graphing optimization. (i.e jwave)
+ while (it.hasNext()) {
+ Result result = it.next();
+ for (KeyValue kv : result.raw()) {
+ byte[] key = kv.getQualifier();
+ long timestamp = ByteBuffer.wrap(key).getLong();
+ double value = Double
+ .parseDouble(new String(kv.getValue(), "UTF-8"));
+ series.add(timestamp, value);
+ }
+ }
+ results.close();
+ currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
}
- results.close();
table.close();
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
+ } catch (Exception e) {
+ LOG.error(ExceptionUtil.getStackTrace(e));
}
return series;
}
- public static Set<String> getFamilyNames(String tableName) {
+ public static Set<String> getMetricNames(String metricGroup) {
Set<String> familyNames = new CopyOnWriteArraySet<String>();
try {
- HTableInterface table = getHTableConnection().getTable(tableName);
- Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
- for(byte[] name : families) {
- familyNames.add(new String(name));
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Get get = new Get(metricGroup.getBytes());
+ Result result = table.get(get);
+ for (KeyValue kv : result.raw()) {
+ JSONObject json = (JSONObject) JSONValue.parse(new String(kv.getValue(), "UTF-8"));
+ if (json.get("type").equals("metric")) {
+ familyNames.add(new String(kv.getQualifier(), "UTF-8"));
+ }
}
table.close();
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
+ connection.close();
+ } catch (Exception e) {
+ LOG.error(ExceptionUtil.getStackTrace(e));
}
return familyNames;
-
+
}
-
- public static Set<String> getTableNames() {
- Set<String> tableNames = new CopyOnWriteArraySet<String>();
+
+ public static Set<String> getMetricGroups() {
+ Set<String> metricGroups = new CopyOnWriteArraySet<String>();
try {
- HBaseAdmin admin = new HBaseAdmin(hconf);
- HTableDescriptor[] td = admin.listTables();
- for(HTableDescriptor table : td) {
- tableNames.add(new String(table.getName()));
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Scan scan = new Scan();
+ scan.addFamily(KEY_NAMES);
+ ResultScanner rs = table.getScanner(scan);
+ Iterator<Result> it = rs.iterator();
+ while (it.hasNext()) {
+ Result result = it.next();
+ metricGroups.add(new String(result.getRow(), "UTF-8"));
}
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
+ table.close();
+ connection.close();
+ } catch (Exception e) {
+ LOG.error(ExceptionUtil.getStackTrace(e));
}
- return tableNames;
+ return metricGroups;
}
- public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
- Result result = it.next();
- if(result!=null) {
- List<Cell> cList = result.listCells();
- for(Cell cell : cList) {
- columnNames.add(new String(CellUtil.cloneQualifier(cell)));
- }
- }
- }
-
- public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
- Set<String> columnNames = new CopyOnWriteArraySet<String>();
+ public static Set<String> getSourceNames(String dataType) {
+ Set<String> pk = new HashSet<String>();
try {
- HTableInterface table = getHTableConnection().getTable(tableName);
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Scan scan = new Scan();
- if(!fullScan) {
- // Take sample columns of the recent time.
- StringBuilder temp = new StringBuilder();
- temp.append(endTime-300000L);
- scan.setStartRow(temp.toString().getBytes());
- temp.setLength(0);
- temp.append(endTime);
- scan.setStopRow(temp.toString().getBytes());
- } else {
- StringBuilder temp = new StringBuilder();
- temp.append(startTime);
- scan.setStartRow(temp.toString().getBytes());
- temp.setLength(0);
- temp.append(endTime);
- scan.setStopRow(temp.toString().getBytes());
- }
- scan.addFamily(family.getBytes());
- ResultScanner results = table.getScanner(scan);
- Iterator<Result> it = results.iterator();
- if(fullScan) {
- while(it.hasNext()) {
- getColumnNamesHelper(columnNames, it);
- }
- } else {
- getColumnNamesHelper(columnNames, it);
+ scan.addFamily(KEY_NAMES);
+ ResultScanner rs = table.getScanner(scan);
+ Iterator<Result> it = rs.iterator();
+ while (it.hasNext()) {
+ Result result = it.next();
+ for (Cell cell : result.rawCells()) {
+ JSONObject json = (JSONObject) JSONValue.parse(new String(cell.getValue(), "UTF-8"));
+ if (json.get("type").equals("source")) {
+ pk.add(new String(cell.getQualifier(), "UTF-8"));
+ }
+ }
}
- results.close();
table.close();
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
+ connection.close();
+ } catch (Exception e) {
+ LOG.error(ExceptionUtil.getStackTrace(e));
}
- return columnNames;
+ return pk;
}
-
- public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
- Set<String> rows = new HashSet<String>();
+
+ public static Heatmap getHeatmap(String metricGroup, String metric,
+ long startTime, long endTime, double max, double scale, int height) {
+ final long MINUTE = TimeUnit.MINUTES.toMillis(1);
+ Heatmap heatmap = new Heatmap();
+ Set<String> sources = getSourceNames(metricGroup);
+ Set<String> metrics = getMetricNames(metricGroup);
+ List<Get> series = new ArrayList<Get>();
+ String fullName = new StringBuilder(metricGroup).append(".").append(metric).toString();
try {
- HTableInterface table = getHTableConnection().getTable(tableName);
- Scan scan = new Scan();
- scan.addColumn(family.getBytes(), qualifier.getBytes());
- if(!fullScan) {
- // Take sample columns of the recent time.
- StringBuilder temp = new StringBuilder();
- temp.append(endTime-300000L);
- scan.setStartRow(temp.toString().getBytes());
- temp.setLength(0);
- temp.append(endTime);
- scan.setStopRow(temp.toString().getBytes());
- } else {
- StringBuilder temp = new StringBuilder();
- temp.append(startTime);
- scan.setStartRow(temp.toString().getBytes());
- temp.setLength(0);
- temp.append(endTime);
- scan.setStopRow(temp.toString().getBytes());
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA));
+ Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ c.setTimeInMillis(startTime);
+ int startDay = c.get(Calendar.DAY_OF_YEAR);
+ c.setTimeInMillis(endTime);
+ int endDay = c.get(Calendar.DAY_OF_YEAR);
+ long currentDay = startTime;
+ for (int i = startDay; i <= endDay; i++) {
+ for (String m : metrics) {
+ if (m.startsWith(fullName)) {
+ for (String source : sources) {
+ byte[] rowKey = HBaseUtil.buildKey(currentDay, m, source);
+ Get serie = new Get(rowKey);
+ serie.addFamily(COLUMN_FAMILY);
+ serie.setTimeRange(startTime, endTime);
+ series.add(serie);
+ }
+ }
+ }
+ currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
}
- ResultScanner results = table.getScanner(scan);
- Iterator<Result> it = results.iterator();
- while(it.hasNext()) {
- Result result = it.next();
- String buffer = new String(result.getRow());
- String[] parts = buffer.split("-", 2);
- if(!rows.contains(parts[1])) {
- rows.add(parts[1]);
- }
+ Result[] rs = table.get(series);
+ int index = 0;
+ // Series display in y axis
+ int y = 0;
+ HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
+ for (Result result : rs) {
+ for(Cell cell : result.rawCells()) {
+ byte[] dest = new byte[5];
+ System.arraycopy(cell.getRow(), 3, dest, 0, 5);
+ String source = new String(dest);
+ long time = cell.getTimestamp();
+ // Time display in x axis
+ int x = (int) ((time - startTime) / MINUTE);
+ if (keyMap.containsKey(source)) {
+ y = keyMap.get(source);
+ } else {
+ keyMap.put(source, new Integer(index));
+ y = index;
+ index++;
+ }
+ double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
+ heatmap.put(x, y, v);
+ if (v > max) {
+ max = v;
+ }
+ }
}
- results.close();
table.close();
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
+ int radius = height / index;
+ // Usually scale max from 0 to 100 for visualization
+ heatmap.putMax(scale);
+ for (HeatMapPoint point : heatmap.getHeatmap()) {
+ double round = point.count / max * scale;
+ round = Math.round(round * 100.0) / 100.0;
+ point.put(point.x, point.y * radius, round);
+ }
+ heatmap.putRadius(radius);
+ heatmap.putSeries(index);
+ } catch (IOException e) {
+ LOG.error(ExceptionUtil.getStackTrace(e));
}
- return rows;
+ return heatmap;
}
-
- public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) {
- return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan);
- }
-
+
+ /**
+ * Scan chukwa table and find cluster tag from annotation column family from a
+ * range of entries.
+ *
+ * @param startTime
+ * @param endTime
+ * @return
+ */
public static Set<String> getClusterNames(long startTime, long endTime) {
- String tableName = "SystemMetrics";
- String family = "system";
- String column = "ctags";
Set<String> clusters = new HashSet<String>();
- Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
try {
- HTableInterface table = getHTableConnection().getTable(tableName);
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
Scan scan = new Scan();
- scan.addColumn(family.getBytes(), column.getBytes());
- scan.setTimeRange(startTime, endTime);
- ResultScanner results = table.getScanner(scan);
- Iterator<Result> it = results.iterator();
- while(it.hasNext()) {
+ scan.addFamily(KEY_NAMES);
+ ResultScanner rs = table.getScanner(scan);
+ Iterator<Result> it = rs.iterator();
+ while (it.hasNext()) {
Result result = it.next();
- String buffer = new String(result.getValue(family.getBytes(), column.getBytes()));
- Matcher m = p.matcher(buffer);
- if(m.matches()) {
- clusters.add(m.group(1));
+ for (Cell cell : result.rawCells()) {
+ JSONObject json = (JSONObject) JSONValue.parse(new String(cell.getValue(), "UTF-8"));
+ if (json.get("type").equals("cluster")) {
+ clusters.add(new String(cell.getQualifier(), "UTF-8"));
+ }
}
}
- results.close();
table.close();
- } catch(Exception e) {
- log.error(ExceptionUtil.getStackTrace(e));
+ connection.close();
+ } catch (Exception e) {
+ LOG.error(ExceptionUtil.getStackTrace(e));
}
return clusters;
}
-
- public static Heatmap getHeatmap(String tableName, String family, String column,
- long startTime, long endTime, double max, double scale, int height) {
- final long MINUTE = TimeUnit.MINUTES.toMillis(1);
- Heatmap heatmap = new Heatmap();
-
- try {
- HTableInterface table = getHTableConnection().getTable(tableName);
- Scan scan = new Scan();
- ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes());
- scan.addFamily(family.getBytes());
- scan.setFilter(cpf);
- scan.setTimeRange(startTime, endTime);
- scan.setBatch(10000);
- ResultScanner results = table.getScanner(scan);
- Iterator<Result> it = results.iterator();
- int index = 0;
- // Series display in y axis
- int y = 0;
- HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
- while(it.hasNext()) {
- Result result = it.next();
- List<Cell> cList = result.listCells();
- for(Cell cell : cList) {
- String key = parseRowKey(result.getRow());
- StringBuilder tmp = new StringBuilder();
- tmp.append(key);
- tmp.append(":");
- tmp.append(new String(CellUtil.cloneQualifier(cell)));
- String seriesName = tmp.toString();
- long time = parseTime(result.getRow());
- // Time display in x axis
- int x = (int) ((time - startTime) / MINUTE);
- if(keyMap.containsKey(seriesName)) {
- y = keyMap.get(seriesName);
- } else {
- keyMap.put(seriesName, new Integer(index));
- y = index;
- index++;
- }
- double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
- heatmap.put(x, y, v);
- if(v > max) {
- max = v;
- }
- }
- }
- results.close();
- table.close();
- int radius = height / index;
- // Usually scale max from 0 to 100 for visualization
- heatmap.putMax(scale);
- for(HeatMapPoint point : heatmap.getHeatmap()) {
- double round = point.count / max * scale;
- round = Math.round(round * 100.0) / 100.0;
- point.put(point.x, point.y * radius, round);
- }
- heatmap.putRadius(radius);
- heatmap.putSeries(index);
- } catch (IOException e) {
- log.error(ExceptionUtil.getStackTrace(e));
- }
- return heatmap;
- }
-
- private static String parseRowKey(byte[] row) {
- String key = new String(row);
- String[] parts = key.split("-", 2);
- return parts[1];
- }
-
- private static long parseTime(byte[] row) {
- String key = new String(row);
- String[] parts = key.split("-", 2);
- long time = Long.parseLong(parts[0]);
- return time;
- }
-
- private static HConnection getHTableConnection() {
- if(connection == null) {
- synchronized(ChukwaHBaseStore.class) {
- try {
- ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
- /* Set the hbase client properties to unblock immediately in case
- * hbase goes down. This will ensure we timeout on socket connection to
- * hbase early.
- */
- hconf.setInt("hbase.client.operation.timeout", 60000);
- hconf.setLong("hbase.client.pause", 1000);
- hconf.setInt("hbase.client.retries.number", 1);
- connection = HConnectionManager.createConnection(hconf, pool);
- }catch(IOException e) {
- log.error("Unable to obtain connection to HBase " + e.getMessage());
- e.printStackTrace();
- }
- }
- }
- return connection;
- }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
deleted file mode 100644
index 6b60b3a..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-public class DFInvalidRecord extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = 1254238125122522523L;
-
- public DFInvalidRecord() {
- }
-
- public DFInvalidRecord(String arg0) {
- super(arg0);
- }
-
- public DFInvalidRecord(Throwable arg0) {
- super(arg0);
- }
-
- public DFInvalidRecord(String arg0, Throwable arg1) {
- super(arg0, arg1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
deleted file mode 100644
index ef2bddd..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="Disk")
-public class Df extends AbstractProcessor {
- static Logger log = Logger.getLogger(Df.class);
-
- private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
- "Used", "Available", "Use%", "Mounted", "on" };
- private static final String[] headerCols = { "Filesystem", "1K-blocks",
- "Used", "Available", "Use%", "Mounted on" };
- private SimpleDateFormat sdf = null;
-
- public Df() {
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable {
-
- try {
- String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- // String level = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- // String className = recordEntry.substring(start, idx-1);
- String body = recordEntry.substring(idx + 1);
-
- Date d = sdf.parse(dStr);
- String[] lines = body.split("\n");
-
- String[] outputCols = lines[0].substring(lines[0].indexOf("Filesystem")).split("[\\s]++");
-
- if (outputCols.length != headerSplitCols.length
- || outputCols[0].intern() != headerSplitCols[0].intern()
- || outputCols[1].intern() != headerSplitCols[1].intern()
- || outputCols[2].intern() != headerSplitCols[2].intern()
- || outputCols[3].intern() != headerSplitCols[3].intern()
- || outputCols[4].intern() != headerSplitCols[4].intern()
- || outputCols[5].intern() != headerSplitCols[5].intern()
- || outputCols[6].intern() != headerSplitCols[6].intern()) {
- throw new DFInvalidRecord("Wrong output format (header) ["
- + recordEntry + "]");
- }
-
- String[] values = null;
-
- // Data
- ChukwaRecord record = null;
-
- for (int i = 1; i < lines.length; i++) {
- values = lines[i].split("[\\s]++");
- key = new ChukwaRecordKey();
- record = new ChukwaRecord();
- this.buildGenericRecord(record, null, d.getTime(), "Df");
-
- record.add(headerCols[0], values[0]);
- record.add(headerCols[1], values[1]);
- record.add(headerCols[2], values[2]);
- record.add(headerCols[3], values[3]);
- record.add(headerCols[4], values[4]
- .substring(0, values[4].length() - 1)); // Remove %
- record.add(headerCols[5], values[5]);
-
- output.collect(key, record);
- }
-
- // log.info("DFProcessor output 1 DF record");
- } catch (ParseException e) {
- e.printStackTrace();
- log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
- throw e;
- } catch (IOException e) {
- e.printStackTrace();
- log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]",
- e);
- throw e;
- } catch (DFInvalidRecord e) {
- e.printStackTrace();
- log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
index 769c054..f671049 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
@@ -50,22 +50,9 @@ import org.json.simple.JSONValue;
@Table(name="Hadoop",columnFamily="mapred_tasktracker"),
@Table(name="Hadoop",columnFamily="rpc_metrics")
})
-public class HadoopMetricsProcessor extends AbstractProcessor {
-// public static final String jvm = "jvm_metrics";
-// public static final String mapred = "mapred_metrics";
-// public static final String dfs = "dfs_metrics";
-// public static final String namenode = "dfs_namenode";
-// public static final String fsdir = "dfs_FSDirectory";
-// public static final String fsname = "dfs_FSNamesystem";
-// public static final String datanode = "dfs_datanode";
-// public static final String jobtracker = "mapred_jobtracker";
-// public static final String shuffleIn = "mapred_shuffleInput";
-// public static final String shuffleOut = "mapred_shuffleOutput";
-// public static final String tasktracker = "mapred_tasktracker";
-// public static final String mr = "mapred_job";
-
+public class HadoopMetricsProcessor extends AbstractProcessor {
static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
- static final String chukwaTimestampField = "chukwa_timestamp";
+ static final String chukwaTimestampField = "timestamp";
static final String contextNameField = "contextName";
static final String recordNameField = "recordName";
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
deleted file mode 100644
index 452ddb8..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="SystemMetrics")
-public class Iostat extends AbstractProcessor {
- static Logger log = Logger.getLogger(Iostat.class);
- public final String recordType = this.getClass().getName();
-
- private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
- private static Pattern p = null;
-
- private Matcher matcher = null;
- private SimpleDateFormat sdf = null;
-
- public Iostat() {
- // TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- p = Pattern.compile(regex);
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable {
-
- log.debug("Iostat record: [" + recordEntry + "] type["
- + chunk.getDataType() + "]");
- int i = 0;
-
- matcher = p.matcher(recordEntry);
- while (matcher.find()) {
- log.debug("Iostat Processor Matches");
-
- try {
- Date d = sdf.parse(matcher.group(1).trim());
-
- String[] lines = recordEntry.split("\n");
- String[] headers = null;
- for (int skip = 0; skip < 2; skip++) {
- i++;
- while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) {
- // Skip the first output because the numbers are averaged from
- // system boot up
- log.debug("skip line:" + lines[i]);
- i++;
- }
- }
- while (i < lines.length) {
- ChukwaRecord record = null;
-
- if (lines[i].indexOf("avg-cpu") >= 0
- || lines[i].indexOf("Device") >= 0) {
- headers = parseHeader(lines[i]);
- i++;
- }
- String data[] = parseData(lines[i]);
- if (headers[0].equals("avg-cpu:")) {
- log.debug("Matched CPU-Utilization");
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
- } else if (headers[0].equals("Device:")) {
- log.debug("Matched Iostat");
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
- } else {
- log.debug("No match:" + headers[0]);
- }
- if (record != null) {
- int j = 0;
- log.debug("Data Length: " + data.length);
- while (j < data.length) {
- log.debug("header:" + headers[j] + " data:" + data[j]);
- if (!headers[j].equals("avg-cpu:")) {
- try {
- // Filter out overflow values for older linux systems
- long x=Long.parseLong(data[j]);
- if(x<100000000000L) {
- record.add(headers[j],data[j]);
- }
- } catch(NumberFormatException ex) {
- record.add(headers[j],data[j]);
- }
- }
- j++;
- }
- record.setTime(d.getTime());
- if (data.length > 3) {
- output.collect(key, record);
- }
- }
- i++;
- }
- // End of parsing
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
- }
-
- public String[] parseHeader(String header) {
- String[] headers = header.split("\\s+");
- return headers;
- }
-
- public String[] parseData(String dataLine) {
- String[] data = dataLine.split("\\s+");
- return data;
- }
-
- public String getDataType() {
- return recordType;
- }
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
index 0be8615..dbf4baa 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java
@@ -39,7 +39,7 @@ import org.apache.log4j.Logger;
})
public class JobSummary extends AbstractProcessor {
static Logger log = Logger.getLogger(JobSummary.class);
- static final String chukwaTimestampField = "chukwa_timestamp";
+ static final String chukwaTimestampField = "timestamp";
static final String contextNameField = "contextName";
static final String recordNameField = "recordName";
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
index 9c8a3ae..16b12f3 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
@@ -53,7 +53,7 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor {
JSONObject json = new JSONObject(log.getBody());
// round timestamp
- timestamp = json.getLong("chukwa_timestamp");
+ timestamp = json.getLong("timestamp");
timestamp = (timestamp / 60000) * 60000;
// get record type
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
deleted file mode 100644
index 93252d4..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-public class PbsInvalidEntry extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = 9154096600390233023L;
-
- public PbsInvalidEntry() {
- }
-
- public PbsInvalidEntry(String message) {
- super(message);
- }
-
- public PbsInvalidEntry(Throwable cause) {
- super(cause);
- }
-
- public PbsInvalidEntry(String message, Throwable cause) {
- super(message, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
deleted file mode 100644
index 23dc74c..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-public class PbsNodes extends AbstractProcessor {
- static Logger log = Logger.getLogger(PbsNodes.class);
-
- private static final String rawPBSRecordType = "PbsNodes";
- private static final String machinePBSRecordType = "MachinePbsNodes";
- private SimpleDateFormat sdf = null;
-
- public PbsNodes() {
- // TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable {
-
- // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" +
- // chunk.getDataType() + "]");
-
- StringBuilder sb = new StringBuilder();
- int i = 0;
- String nodeActivityStatus = null;
- StringBuilder sbFreeMachines = new StringBuilder();
- StringBuilder sbUsedMachines = new StringBuilder();
- StringBuilder sbDownMachines = new StringBuilder();
-
- int totalFreeNode = 0;
- int totalUsedNode = 0;
- int totalDownNode = 0;
-
- String body = null;
- ChukwaRecord record = null;
-
- try {
-
- String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- // String level = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- // String className = recordEntry.substring(start, idx-1);
- body = recordEntry.substring(idx + 1);
-
- Date d = sdf.parse(dStr);
-
- String[] lines = body.split("\n");
- while (i < lines.length) {
- while ((i < lines.length) && (lines[i].trim().length() > 0)) {
- sb.append(lines[i].trim()).append("\n");
- i++;
- }
-
- if ((i < lines.length) && (lines[i].trim().length() > 0)) {
- throw new PbsInvalidEntry(recordEntry);
- }
-
- // Empty line
- i++;
-
- if (sb.length() > 0) {
- body = sb.toString();
- // Process all entries for a machine
- // System.out.println("=========>>> Record [" + body+ "]");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
-
- buildGenericRecord(record, null, d.getTime(), machinePBSRecordType);
- parsePbsRecord(body, record);
-
- // Output PbsNode record for 1 machine
- output.collect(key, record);
- // log.info("PbsNodeProcessor output 1 sub-record");
-
- // compute Node Activity information
- nodeActivityStatus = record.getValue("state");
- if (nodeActivityStatus != null) {
- if (nodeActivityStatus.equals("free")) {
- totalFreeNode++;
- sbFreeMachines.append(record.getValue("Machine")).append(",");
- } else if (nodeActivityStatus.equals("job-exclusive")) {
- totalUsedNode++;
- sbUsedMachines.append(record.getValue("Machine")).append(",");
- } else {
- totalDownNode++;
- sbDownMachines.append(record.getValue("Machine")).append(",");
- }
- }
- sb = new StringBuilder();
- }
- }
-
- // End of parsing
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, d.getTime(), "NodeActivity");
-
- record.setTime(d.getTime());
- record.add("used", "" + totalUsedNode);
- record.add("free", "" + totalFreeNode);
- record.add("down", "" + totalDownNode);
- record.add("usedMachines", sbUsedMachines.toString());
- record.add("freeMachines", sbFreeMachines.toString());
- record.add("downMachines", sbDownMachines.toString());
-
- output.collect(key, record);
- // log.info("PbsNodeProcessor output 1 NodeActivity");
- } catch (ParseException e) {
- e.printStackTrace();
- log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
- throw e;
- } catch (IOException e) {
- log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry
- + "]", e);
- e.printStackTrace();
- throw e;
- } catch (PbsInvalidEntry e) {
- log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
-
- }
-
- protected static void parsePbsRecord(String recordLine, ChukwaRecord record) {
- int i = 0;
- String[] lines = recordLine.split("\n");
- record.add("Machine", lines[0]);
-
- i++;
- String[] data = null;
- while (i < lines.length) {
- data = extractFields(lines[i]);
- record.add(data[0].trim(), data[1].trim());
- if (data[0].trim().equalsIgnoreCase("status")) {
- parseStatusField(data[1].trim(), record);
- }
- i++;
- }
- }
-
- protected static void parseStatusField(String statusField, ChukwaRecord record) {
- String[] data = null;
- String[] subFields = statusField.trim().split(",");
- for (String subflied : subFields) {
- data = extractFields(subflied);
- record.add("status-" + data[0].trim(), data[1].trim());
- }
- }
-
- static String[] extractFields(String line) {
- String[] args = new String[2];
- int index = line.indexOf("=");
- args[0] = line.substring(0, index);
- args[1] = line.substring(index + 1);
-
- return args;
- }
-
- public String getDataType() {
- return PbsNodes.rawPBSRecordType;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
deleted file mode 100644
index 7b10edd..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="Ps")
-public class Ps extends AbstractProcessor {
- static Logger log = Logger.getLogger(Ps.class);
- public static final String reduceType = "Ps";
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable {
- LogEntry log = new LogEntry(recordEntry);
- PsOutput ps = new PsOutput(log.getBody());
- for (HashMap<String, String> processInfo : ps.getProcessList()) {
- key = new ChukwaRecordKey();
- ChukwaRecord record = new ChukwaRecord();
- this.buildGenericRecord(record, null, log.getDate().getTime(), reduceType);
- for (Entry<String, String> entry : processInfo.entrySet()) {
- record.add(entry.getKey(), entry.getValue());
- }
- output.collect(key, record);
- }
- }
-
- public static class PsOutput {
-
- // processes info
- private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>();
-
- public PsOutput(String psCmdOutput) throws InvalidPsRecord {
- if (psCmdOutput == null || psCmdOutput.length() == 0)
- return;
-
- String[] lines = psCmdOutput.split("[\n\r]+");
-
- // at least two lines
- if (lines.length < 2)
- return;
-
- // header
- ArrayList<String> header = new ArrayList<String>();
- Matcher matcher = Pattern.compile("[^ ^\t]+").matcher(lines[0]);
- while (matcher.find()) {
- header.add(matcher.group(0));
- }
- if (!header.get(header.size() - 1).equals("CMD")) {
- throw new InvalidPsRecord("CMD must be the last column");
- }
-
- // records
- boolean foundInitCmd = false;
- for (int line = 1; line < lines.length; line++) {
- HashMap<String, String> record = new HashMap<String, String>();
- recordList.add(record);
-
- matcher = Pattern.compile("[^ ^\t]+").matcher(lines[line]);
- for (int index = 0; index < header.size(); index++) {
- String key = header.get(index);
- matcher.find();
- if (!key.equals("CMD")) {
- String value = matcher.group(0);
- /**
- * For STARTED column, it could be in two formats: "MMM dd" or
- * "hh:mm:ss". If we use ' ' as the delimiter, we must read twice to
- * the date if it's with "MMM dd" format.
- */
- if (key.equals("STARTED")) {
- char c = value.charAt(0);
- if (c < '0' || c > '9') {
- matcher.find();
- value += matcher.group(0);
- }
- }
- record.put(key, value);
- } else {
- // reached the cmd part. all remains should be put
- // together as the command
- String value = lines[line].substring(matcher.start());
- record.put(key, value);
- if (!foundInitCmd)
- foundInitCmd = value.startsWith("init");
- break;
- }
- }
- }
- if (!foundInitCmd)
- throw new InvalidPsRecord("Did not find 'init' cmd");
- }
-
- public ArrayList<HashMap<String, String>> getProcessList() {
- return recordList;
- }
- }
-
- public static class InvalidPsRecord extends Exception {
- private static final long serialVersionUID = 1L;
-
- public InvalidPsRecord() {
- }
-
- public InvalidPsRecord(String arg0) {
- super(arg0);
- }
-
- public InvalidPsRecord(Throwable arg0) {
- super(arg0);
- }
-
- public InvalidPsRecord(String arg0, Throwable arg1) {
- super(arg0, arg1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
deleted file mode 100644
index 3573b4c..0000000
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
-
-@Table(name="SystemMetrics",columnFamily="SystemMetrics")
-public class Sar extends AbstractProcessor {
- static Logger log = Logger.getLogger(Sar.class);
- public static final String reduceType = "SystemMetrics";
- public final String recordType = this.getClass().getName();
-
- private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
- private static Pattern p = null;
-
- private Matcher matcher = null;
- private SimpleDateFormat sdf = null;
-
- public Sar() {
- // TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- p = Pattern.compile(regex);
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws Throwable {
-
- log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType()
- + "]");
- int i = 0;
-
- // String logLevel = null;
- // String className = null;
-
- matcher = p.matcher(recordEntry);
- while (matcher.find()) {
- log.debug("Sar Processor Matches");
-
- try {
- Date d = sdf.parse(matcher.group(1).trim());
-
- // logLevel = matcher.group(2);
- // className = matcher.group(3);
-
- // TODO create a more specific key structure
- // part of ChukwaArchiveKey + record index if needed
- key.setKey("" + d.getTime());
-
- String[] lines = recordEntry.split("\n");
-
- String[] headers = null;
- while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) {
- // Skip to the average lines
- log.debug("skip:" + lines[i]);
- i++;
- }
- while (i < lines.length) {
- ChukwaRecord record = null;
- if (lines[i].equals("")) {
- i++;
- headers = parseHeader(lines[i]);
- i++;
- }
- String data[] = parseData(lines[i]);
-
- // FIXME please validate this
- if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
- log.debug("Matched Sar-Network");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null, d.getTime(), reduceType);
- } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
- log.debug("Matched Sar-Network");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null, d.getTime(), reduceType);
- } else if (headers[1].equals("kbmemfree")) {
- log.debug("Matched Sar-Memory");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null, d.getTime(), reduceType);
- } else if (headers[1].equals("totsck")) {
- log.debug("Matched Sar-NetworkSockets");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null, d.getTime(), reduceType);
- } else if (headers[1].equals("runq-sz")) {
- log.debug("Matched Sar-LoadAverage");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record, null, d.getTime(), reduceType);
- } else {
- log.debug("No match:" + headers[1] + " " + headers[2]);
- }
- if (record != null) {
- int j = 0;
-
- log.debug("Data Length: " + data.length);
- while (j < data.length) {
- log.debug("header:" + headers[j] + " data:" + data[j]);
-
- //special case code to work around peculiar versions of Sar
- if(headers[j].equals("rxkB/s")) {
- record.add("rxbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
- } else if(headers[j].equals("txkB/s")){
- record.add("txbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
- } else if (!headers[j].equals("Average:")) { //common case
- record.add(headers[j], data[j]);
- }
- j++;
- }
-
- output.collect(key, record);
- }
- i++;
- }
- // End of parsing
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
- }
-
- public String[] parseHeader(String header) {
- String[] headers = header.split("\\s+");
- return headers;
- }
-
- public String[] parseData(String dataLine) {
- String[] data = dataLine.split("\\s+");
- return data;
- }
-
- public String getDataType() {
- return recordType;
- }
-}
\ No newline at end of file