You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2015/01/11 20:57:46 UTC

svn commit: r1650958 - in /chukwa/trunk: CHANGES.txt src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java

Author: eyang
Date: Sun Jan 11 19:57:45 2015
New Revision: 1650958

URL: http://svn.apache.org/r1650958
Log:
CHUKWA-723. Update Chukwa code to use new HBase HConnection API.  (Sreepathi Prasanna via Eric Yang)

Modified:
    chukwa/trunk/CHANGES.txt
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java

Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1650958&r1=1650957&r2=1650958&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Sun Jan 11 19:57:45 2015
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+  CHUKWA-723. Update Chukwa code to use new HBase HConnection API.  (Sreepathi Prasanna via Eric Yang)
+
   BUGS
 
 Release 0.6 - 09/28/2014

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java?rev=1650958&r1=1650957&r2=1650958&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java Sun Jan 11 19:57:45 2015
@@ -25,6 +25,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -33,14 +35,15 @@ import org.apache.hadoop.chukwa.hicc.bea
 import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
 import org.apache.hadoop.chukwa.hicc.bean.Series;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
-
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -52,7 +55,8 @@ import org.apache.log4j.Logger;
 
 public class ChukwaHBaseStore {
   private static Configuration hconf = HBaseConfiguration.create();
-  private static HTablePool pool = new HTablePool(hconf, 60);
+  private static HConnection connection = null;
+  private static final int POOL_SIZE = 60;
   static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
   
   public static Series getSeries(String tableName, String rkey, String family, String column,
@@ -66,7 +70,7 @@ public class ChukwaHBaseStore {
 
     Series series = new Series(seriesName.toString());
     try {
-      HTableInterface table = pool.getTable(tableName);
+      HTableInterface table = getHTableConnection().getTable(tableName);
       Calendar c = Calendar.getInstance();
       c.setTimeInMillis(startTime);
       c.set(Calendar.MINUTE, 0);
@@ -109,7 +113,7 @@ public class ChukwaHBaseStore {
   public static Set<String> getFamilyNames(String tableName) {
     Set<String> familyNames = new CopyOnWriteArraySet<String>();
     try {
-      HTableInterface table = pool.getTable(tableName);
+      HTableInterface table = getHTableConnection().getTable(tableName);
       Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
       for(byte[] name : families) {
         familyNames.add(new String(name));
@@ -139,9 +143,9 @@ public class ChukwaHBaseStore {
   public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
     Result result = it.next();
     if(result!=null) {
-      List<KeyValue> kvList = result.list();
-      for(KeyValue kv : kvList) {
-        columnNames.add(new String(kv.getQualifier()));
+      List<Cell> cList = result.listCells();
+      for(Cell cell : cList) {
+        columnNames.add(new String(CellUtil.cloneQualifier(cell)));
       }
     }
   }
@@ -149,7 +153,7 @@ public class ChukwaHBaseStore {
   public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
     Set<String> columnNames = new CopyOnWriteArraySet<String>();
     try {
-      HTableInterface table = pool.getTable(tableName);
+      HTableInterface table = getHTableConnection().getTable(tableName);
       Scan scan = new Scan();
       if(!fullScan) {
         // Take sample columns of the recent time.
@@ -187,8 +191,8 @@ public class ChukwaHBaseStore {
   
   public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
     Set<String> rows = new HashSet<String>();
-    HTableInterface table = pool.getTable(tableName);
     try {
+      HTableInterface table = getHTableConnection().getTable(tableName);
       Scan scan = new Scan();
       scan.addColumn(family.getBytes(), qualifier.getBytes());
       if(!fullScan) {
@@ -234,9 +238,9 @@ public class ChukwaHBaseStore {
     String family = "system";
     String column = "ctags";
     Set<String> clusters = new HashSet<String>();
-    HTableInterface table = pool.getTable(tableName);
     Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
     try {
+      HTableInterface table = getHTableConnection().getTable(tableName);
       Scan scan = new Scan();
       scan.addColumn(family.getBytes(), column.getBytes());
       scan.setTimeRange(startTime, endTime);
@@ -262,8 +266,9 @@ public class ChukwaHBaseStore {
 		  long startTime, long endTime, double max, double scale, int height) {
 	final long MINUTE = TimeUnit.MINUTES.toMillis(1);
 	Heatmap heatmap = new Heatmap();
-    HTableInterface table = pool.getTable(tableName);
+    
     try {
+        HTableInterface table = getHTableConnection().getTable(tableName);
         Scan scan = new Scan();
         ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes());
         scan.addFamily(family.getBytes());
@@ -278,13 +283,13 @@ public class ChukwaHBaseStore {
 		HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
 	    while(it.hasNext()) {
 		  Result result = it.next();
-		  List<KeyValue> kvList = result.list();
-	      for(KeyValue kv : kvList) {
+		  List<Cell> cList = result.listCells();
+	      for(Cell cell : cList) {
 			String key = parseRowKey(result.getRow());
 			StringBuilder tmp = new StringBuilder();
 			tmp.append(key);
 			tmp.append(":");
-			tmp.append(new String(kv.getQualifier()));
+			tmp.append(new String(CellUtil.cloneQualifier(cell)));
 			String seriesName = tmp.toString();
 			long time = parseTime(result.getRow());
 			// Time display in x axis
@@ -296,7 +301,7 @@ public class ChukwaHBaseStore {
 		      y = index;
 		      index++;
 			}
-			double v = Double.parseDouble(new String(kv.getValue()));
+			double v = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
 			heatmap.put(x, y, v);
 			if(v > max) {
 				max = v;
@@ -333,5 +338,27 @@ public class ChukwaHBaseStore {
 	  long time = Long.parseLong(parts[0]);
 	  return time;
   }
+  
+  private static HConnection getHTableConnection() {
+    if(connection == null) {
+      synchronized(ChukwaHBaseStore.class) {
+        try {
+          ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
+          /* Set the hbase client properties to unblock immediately in case
+           * hbase goes down. This will ensure we timeout on socket connection to
+           * hbase early.
+           */
+          hconf.setInt("hbase.client.operation.timeout", 60000);
+          hconf.setLong("hbase.client.pause", 1000);
+          hconf.setInt("hbase.client.retries.number", 1);
+          connection = HConnectionManager.createConnection(hconf, pool);
+        }catch(IOException e) {
+          log.error("Unable to obtain connection to HBase " + e.getMessage());
+          e.printStackTrace();
+        }
+      }
+    }
+    return connection;
+  }
 
 }