You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/05/26 02:12:37 UTC
[02/19] chukwa git commit: CHUKWA-747. Update Widget API to store
data in HBase. (Eric Yang)
CHUKWA-747. Update Widget API to store data in HBase. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/4134fc32
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/4134fc32
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/4134fc32
Branch: refs/heads/master
Commit: 4134fc32c095e83570bd36ec571b2f1904348149
Parents: 1c4ed7b
Author: Eric Yang <ey...@apache.org>
Authored: Sun May 10 16:30:22 2015 -0700
Committer: Eric Yang <ey...@apache.org>
Committed: Sun May 10 16:30:22 2015 -0700
----------------------------------------------------------------------
.../chukwa/datastore/ChukwaHBaseStore.java | 330 +++++++++++++++++--
1 file changed, 295 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/chukwa/blob/4134fc32/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index 7494aa8..476bd79 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -18,44 +18,46 @@
package org.apache.hadoop.chukwa.datastore;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.chukwa.hicc.Chart;
+import org.apache.hadoop.chukwa.hicc.bean.Chart;
import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint;
import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
+import org.apache.hadoop.chukwa.hicc.bean.LineOptions;
import org.apache.hadoop.chukwa.hicc.bean.Series;
+import org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData;
+import org.apache.hadoop.chukwa.hicc.bean.Widget;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.util.HBaseUtil;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -63,25 +65,30 @@ import org.json.simple.JSONValue;
import com.google.gson.Gson;
public class ChukwaHBaseStore {
- private static Configuration hconf = HBaseConfiguration.create();
static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class);
static byte[] COLUMN_FAMILY = "t".getBytes();
static byte[] ANNOTATION_FAMILY = "a".getBytes();
static byte[] KEY_NAMES = "k".getBytes();
static byte[] CHART_TYPE = "chart_meta".getBytes();
static byte[] CHART_FAMILY = "c".getBytes();
+ static byte[] COMMON_FAMILY = "c".getBytes();
+ static byte[] WIDGET_TYPE = "widget_meta".getBytes();
private static final String CHUKWA = "chukwa";
private static final String CHUKWA_META = "chukwa_meta";
private static long MILLISECONDS_IN_DAY = 86400000L;
- private static Connection connection = null;
+ protected static Connection connection = null;
- public static void getHBaseConnection() throws IOException {
+ public ChukwaHBaseStore() {
+ super();
+ }
+
+ public static synchronized void getHBaseConnection() throws IOException {
if (connection == null || connection.isClosed()) {
- connection = ConnectionFactory.createConnection(hconf);
+ connection = ConnectionFactory.createConnection();
}
}
- public static void closeHBase() {
+ public static synchronized void closeHBase() {
try {
if(connection != null) {
connection.close();
@@ -236,6 +243,7 @@ public class ChukwaHBaseStore {
}
}
}
+ rs.close();
table.close();
} catch (Exception e) {
closeHBase();
@@ -354,6 +362,12 @@ public class ChukwaHBaseStore {
return clusters;
}
+ /**
+ * Get a chart from HBase by ID.
+ *
+ * @param id
+ * @return
+ */
public static Chart getChart(String id) {
Chart chart = null;
try {
@@ -374,6 +388,12 @@ public class ChukwaHBaseStore {
return chart;
}
+ /**
+ * Update a chart in HBase by ID.
+ *
+ * @param id
+ * @param chart
+ */
public static void putChart(String id, Chart chart) {
try {
getHBaseConnection();
@@ -391,33 +411,54 @@ public class ChukwaHBaseStore {
}
- public static String createChart(Chart chart) throws IOException {
- getHBaseConnection();
+ /**
+ * Create a chart in HBase.
+ *
+ * @param chart
+ * @return id of newly created chart
+ * @throws IOException
+ */
+ public static synchronized String createChart(Chart chart) {
String id = chart.getId();
- if(id!=null) {
- // Check if there is existing chart with same id.
- Chart test = getChart(id);
- if(test!=null) {
- // If id already exists, randomly generate an id.
+ try {
+ getHBaseConnection();
+ if (id != null) {
+ // Check if there is existing chart with same id.
+ Chart test = getChart(id);
+ if (test != null) {
+ // If id already exists, randomly generate an id.
+ id = String.valueOf(UUID.randomUUID());
+ }
+ } else {
+ // If id is not provided, randomly generate an id.
id = String.valueOf(UUID.randomUUID());
}
- } else {
- // If id is not provided, randomly generate an id.
- id = String.valueOf(UUID.randomUUID());
+ chart.setId(id);
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Put put = new Put(CHART_TYPE);
+ Gson gson = new Gson();
+ String buffer = gson.toJson(chart);
+ put.add(CHART_FAMILY, id.getBytes(), buffer.getBytes());
+ table.put(put);
+ table.close();
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ id = null;
}
- chart.setId(id);
- Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
- Put put = new Put(CHART_TYPE);
- Gson gson = new Gson();
- String buffer = gson.toJson(chart);
- put.add(CHART_FAMILY, id.getBytes(), buffer.getBytes());
- table.put(put);
- table.close();
return id;
}
- public static synchronized ArrayList<org.apache.hadoop.chukwa.hicc.Series> getChartSeries(ArrayList<org.apache.hadoop.chukwa.hicc.Series> series, long startTime, long endTime) {
- ArrayList<org.apache.hadoop.chukwa.hicc.Series> list = new ArrayList<org.apache.hadoop.chukwa.hicc.Series>();
+ /**
+ * Return data for multiple series of metrics stored in HBase.
+ *
+ * @param series
+ * @param startTime
+ * @param endTime
+ * @return
+ */
+ public static synchronized ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData> getChartSeries(ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData> series, long startTime, long endTime) {
+ ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData> list = new ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData>();
try {
// Swap start and end if the values are inverted.
if (startTime > endTime) {
@@ -433,8 +474,8 @@ public class ChukwaHBaseStore {
int startDay = c.get(Calendar.DAY_OF_YEAR);
c.setTimeInMillis(endTime);
int endDay = c.get(Calendar.DAY_OF_YEAR);
- for (org.apache.hadoop.chukwa.hicc.Series s : series) {
- org.apache.hadoop.chukwa.hicc.Series clone = (org.apache.hadoop.chukwa.hicc.Series) s.clone();
+ for (org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData s : series) {
+ org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData clone = (org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData) s.clone();
long currentDay = startTime;
String[] parts = s.getUrl().toString().split("/");
String metric = parts[5];
@@ -479,4 +520,223 @@ public class ChukwaHBaseStore {
return list;
}
+ /**
+ * List widgets stored in HBase.
+ *
+ * @param limit
+ * @param offset
+ * @return
+ */
+ public static synchronized List<Widget> listWidget(int limit, int offset) {
+ ArrayList<Widget> list = new ArrayList<Widget>();
+ try {
+ getHBaseConnection();
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Scan scan = new Scan();
+ scan.setStartRow(WIDGET_TYPE);
+ scan.setStopRow(WIDGET_TYPE);
+ ResultScanner rs = table.getScanner(scan);
+ Iterator<Result> it = rs.iterator();
+ int c = 0;
+ while(it.hasNext()) {
+ Result result = it.next();
+ for(KeyValue kv : result.raw()) {
+ if(c > limit) {
+ break;
+ }
+ if(c < offset) {
+ continue;
+ }
+ Gson gson = new Gson();
+ Widget widget = gson.fromJson(new String(kv.getValue(), "UTF-8"), Widget.class);
+ list.add(widget);
+ c++;
+ }
+ }
+ rs.close();
+ table.close();
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ }
+ return list;
+ }
+
+ /**
+ * Find widget by title prefix in HBase.
+ *
+ * @param query - Prefix query of widget title.
+ * @return
+ */
+ public static synchronized List<Widget> searchWidget(String query) {
+ ArrayList<Widget> list = new ArrayList<Widget>();
+ try {
+ getHBaseConnection();
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Filter filter = new ColumnPrefixFilter(Bytes.toBytes(query));
+ Scan scan = new Scan();
+ scan.setStartRow(WIDGET_TYPE);
+ scan.setStopRow(WIDGET_TYPE);
+ scan.setFilter(filter);
+ ResultScanner rs = table.getScanner(scan);
+ Iterator<Result> it = rs.iterator();
+ while(it.hasNext()) {
+ Result result = it.next();
+ for(KeyValue kv : result.raw()) {
+ Gson gson = new Gson();
+ Widget widget = gson.fromJson(new String(kv.getValue(), "UTF-8"), Widget.class);
+ list.add(widget);
+ }
+ }
+ rs.close();
+ table.close();
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ }
+ return list;
+ }
+
+ /**
+ * View a widget information in HBase.
+ *
+ * @param title - Title of the widget.
+ * @return
+ */
+ public static synchronized Widget viewWidget(String title) {
+ Widget w = null;
+ try {
+ getHBaseConnection();
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Get widget = new Get(WIDGET_TYPE);
+ widget.addColumn(COMMON_FAMILY, title.getBytes());
+ Result rs = table.get(widget);
+ byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes());
+ Gson gson = new Gson();
+ w = gson.fromJson(new String(buffer), Widget.class);
+ table.close();
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ }
+ return w;
+ }
+
+ /**
+ * Create a widget in HBase.
+ *
+ * @param widget
+ */
+ public static synchronized boolean createWidget(Widget widget) {
+ boolean created = false;
+ try {
+ widget.tokenize();
+ getHBaseConnection();
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Get widgetTest = new Get(WIDGET_TYPE);
+ widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes());
+ if (table.exists(widgetTest)) {
+ LOG.warn("Widget: " + widget.getTitle() + " already exists.");
+ created = false;
+ } else {
+ Put put = new Put(WIDGET_TYPE);
+ Gson gson = new Gson();
+ String buffer = gson.toJson(widget);
+ put.add(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes());
+ table.put(put);
+ created = true;
+ }
+ table.close();
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ }
+ return created;
+ }
+
+ /**
+ * Update a widget in HBase.
+ *
+ * @param title
+ * @param widget
+ * @throws IOException
+ */
+ public static synchronized boolean updateWidget(String title, Widget widget) {
+ boolean result = false;
+ try {
+ getHBaseConnection();
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Delete oldWidget = new Delete(WIDGET_TYPE);
+ oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+ table.delete(oldWidget);
+ Put put = new Put(WIDGET_TYPE);
+ Gson gson = new Gson();
+ String buffer = gson.toJson(widget);
+ put.add(COMMON_FAMILY, title.getBytes(), buffer.getBytes());
+ table.put(put);
+ table.close();
+ result = true;
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ LOG.error("Error in updating widget, original title: " +
+ title + " new title:" + widget.getTitle());
+ }
+ return result;
+ }
+
+ /**
+ * Delete a widget in HBase.
+ *
+ * @param title
+ * @param widget
+ * @throws IOException
+ */
+ public static synchronized boolean deleteWidget(String title) {
+ boolean result = false;
+ try {
+ getHBaseConnection();
+ Table table = connection.getTable(TableName.valueOf(CHUKWA_META));
+ Delete oldWidget = new Delete(WIDGET_TYPE);
+ oldWidget.addColumn(COMMON_FAMILY, title.getBytes());
+ table.delete(oldWidget);
+ table.close();
+ result = true;
+ } catch (Exception e) {
+ closeHBase();
+ LOG.error(ExceptionUtil.getStackTrace(e));
+ LOG.error("Error in deleting widget: "+ title);
+ }
+ return result;
+ }
+
+ public static void populateDefaults() {
+ try {
+ String hostname = InetAddress.getLocalHost().getHostName();
+ // Populate default widgets
+ Widget widget = new Widget();
+ widget.setTitle("System Load Average");
+ widget.setUrl(new URI("/hicc/v1/chart/draw/1"));
+ createWidget(widget);
+
+ // Populate example chart widgets
+ Chart chart = new Chart("1");
+ chart.setYUnitType("");
+ chart.setTitle("Load Average");
+ ArrayList<SeriesMetaData> series = new ArrayList<SeriesMetaData>();
+
+ SeriesMetaData s = new SeriesMetaData();
+ s.setLabel("SystemMetrics.LoadAverage.1/" + hostname);
+ s.setUrl(new URI("/hicc/v1/metrics/series/SystemMetrics.LoadAverage.1/"
+ + hostname));
+ LineOptions l = new LineOptions();
+ s.setLineOptions(l);
+ series.add(s);
+
+ chart.SetSeries(series);
+ createChart(chart);
+ } catch (Throwable ex) {
+ LOG.error(ExceptionUtil.getStackTrace(ex));
+ }
+ }
}