You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2015/01/11 20:57:46 UTC
svn commit: r1650958 - in /chukwa/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
Author: eyang
Date: Sun Jan 11 19:57:45 2015
New Revision: 1650958
URL: http://svn.apache.org/r1650958
Log:
CHUKWA-723. Update Chukwa code to use new HBase HConnection API. (Sreepathi Prasanna via Eric Yang)
Modified:
chukwa/trunk/CHANGES.txt
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1650958&r1=1650957&r2=1650958&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Sun Jan 11 19:57:45 2015
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ CHUKWA-723. Update Chukwa code to use new HBase HConnection API. (Sreepathi Prasanna via Eric Yang)
+
BUGS
Release 0.6 - 09/28/2014
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java?rev=1650958&r1=1650957&r2=1650958&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java Sun Jan 11 19:57:45 2015
@@ -25,6 +25,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -33,14 +35,15 @@ import org.apache.hadoop.chukwa.hicc.bea
import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
import org.apache.hadoop.chukwa.hicc.bean.Series;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
-
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -52,7 +55,8 @@ import org.apache.log4j.Logger;
public class ChukwaHBaseStore {
private static Configuration hconf = HBaseConfiguration.create();
- private static HTablePool pool = new HTablePool(hconf, 60);
+ private static HConnection connection = null;
+ private static final int POOL_SIZE = 60;
static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
public static Series getSeries(String tableName, String rkey, String family, String column,
@@ -66,7 +70,7 @@ public class ChukwaHBaseStore {
Series series = new Series(seriesName.toString());
try {
- HTableInterface table = pool.getTable(tableName);
+ HTableInterface table = getHTableConnection().getTable(tableName);
Calendar c = Calendar.getInstance();
c.setTimeInMillis(startTime);
c.set(Calendar.MINUTE, 0);
@@ -109,7 +113,7 @@ public class ChukwaHBaseStore {
public static Set<String> getFamilyNames(String tableName) {
Set<String> familyNames = new CopyOnWriteArraySet<String>();
try {
- HTableInterface table = pool.getTable(tableName);
+ HTableInterface table = getHTableConnection().getTable(tableName);
Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
for(byte[] name : families) {
familyNames.add(new String(name));
@@ -139,9 +143,9 @@ public class ChukwaHBaseStore {
public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
Result result = it.next();
if(result!=null) {
- List<KeyValue> kvList = result.list();
- for(KeyValue kv : kvList) {
- columnNames.add(new String(kv.getQualifier()));
+ List<Cell> cList = result.listCells();
+ for(Cell cell : cList) {
+ columnNames.add(new String(CellUtil.cloneQualifier(cell)));
}
}
}
@@ -149,7 +153,7 @@ public class ChukwaHBaseStore {
public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
Set<String> columnNames = new CopyOnWriteArraySet<String>();
try {
- HTableInterface table = pool.getTable(tableName);
+ HTableInterface table = getHTableConnection().getTable(tableName);
Scan scan = new Scan();
if(!fullScan) {
// Take sample columns of the recent time.
@@ -187,8 +191,8 @@ public class ChukwaHBaseStore {
public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
Set<String> rows = new HashSet<String>();
- HTableInterface table = pool.getTable(tableName);
try {
+ HTableInterface table = getHTableConnection().getTable(tableName);
Scan scan = new Scan();
scan.addColumn(family.getBytes(), qualifier.getBytes());
if(!fullScan) {
@@ -234,9 +238,9 @@ public class ChukwaHBaseStore {
String family = "system";
String column = "ctags";
Set<String> clusters = new HashSet<String>();
- HTableInterface table = pool.getTable(tableName);
Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
try {
+ HTableInterface table = getHTableConnection().getTable(tableName);
Scan scan = new Scan();
scan.addColumn(family.getBytes(), column.getBytes());
scan.setTimeRange(startTime, endTime);
@@ -262,8 +266,9 @@ public class ChukwaHBaseStore {
long startTime, long endTime, double max, double scale, int height) {
final long MINUTE = TimeUnit.MINUTES.toMillis(1);
Heatmap heatmap = new Heatmap();
- HTableInterface table = pool.getTable(tableName);
+
try {
+ HTableInterface table = getHTableConnection().getTable(tableName);
Scan scan = new Scan();
ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes());
scan.addFamily(family.getBytes());
@@ -278,13 +283,13 @@ public class ChukwaHBaseStore {
HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
while(it.hasNext()) {
Result result = it.next();
- List<KeyValue> kvList = result.list();
- for(KeyValue kv : kvList) {
+ List<Cell> cList = result.listCells();
+ for(Cell cell : cList) {
String key = parseRowKey(result.getRow());
StringBuilder tmp = new StringBuilder();
tmp.append(key);
tmp.append(":");
- tmp.append(new String(kv.getQualifier()));
+ tmp.append(new String(CellUtil.cloneQualifier(cell)));
String seriesName = tmp.toString();
long time = parseTime(result.getRow());
// Time display in x axis
@@ -296,7 +301,7 @@ public class ChukwaHBaseStore {
y = index;
index++;
}
- double v = Double.parseDouble(new String(kv.getValue()));
+ double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
heatmap.put(x, y, v);
if(v > max) {
max = v;
@@ -333,5 +338,27 @@ public class ChukwaHBaseStore {
long time = Long.parseLong(parts[0]);
return time;
}
+
+ private static HConnection getHTableConnection() {
+ if(connection == null) {
+ synchronized(ChukwaHBaseStore.class) {
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
+ /* Set the hbase client properties to unblock immediately in case
+ * hbase goes down. This will ensure we timeout on socket connection to
+ * hbase early.
+ */
+ hconf.setInt("hbase.client.operation.timeout", 60000);
+ hconf.setLong("hbase.client.pause", 1000);
+ hconf.setInt("hbase.client.retries.number", 1);
+ connection = HConnectionManager.createConnection(hconf, pool);
+ }catch(IOException e) {
+ log.error("Unable to obtain connection to HBase " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ }
+ return connection;
+ }
}