You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/14 08:43:22 UTC

kylin git commit: KYLIN-1760 auto create HTable and column family before saving query

Repository: kylin
Updated Branches:
  refs/heads/master 9a3c96bfe -> 0a3541254


KYLIN-1760 auto create HTable and column family before saving query


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a354125
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a354125
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a354125

Branch: refs/heads/master
Commit: 0a354125455e8bd34dfa934134543b851f4c09d2
Parents: 9a3c96b
Author: Li Yang <li...@apache.org>
Authored: Tue Jun 14 16:43:11 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Tue Jun 14 16:43:11 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/rest/service/QueryService.java |  8 ++-
 .../kylin/storage/hbase/HBaseConnection.java    | 57 ++++++++++++++++----
 2 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0a354125/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 08b338c..3f8b9d7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -45,6 +45,7 @@ import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -180,11 +181,14 @@ public class QueryService extends BasicService {
         if (null == creator) {
             return null;
         }
-
+        
         List<Query> queries = new ArrayList<Query>();
         HTableInterface htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            HConnection conn = HBaseConnection.get(hbaseUrl);
+            HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
+            
+            htable = conn.getTable(userTableName);
             Get get = new Get(Bytes.toBytes(creator));
             get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
             Result result = htable.get(get);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a354125/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index c1fd1ce..d4dd3ae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -19,7 +19,10 @@
 package org.apache.kylin.storage.hbase;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -46,6 +49,8 @@ import org.apache.kylin.engine.mr.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  * @author yangli9
  * 
@@ -101,7 +106,7 @@ public class HBaseConnection {
             tpe.allowCoreThreadTimeOut(true);
 
             logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);
-            
+
             coprocessorPool = tpe;
             return coprocessorPool;
         }
@@ -230,23 +235,39 @@ public class HBaseConnection {
         deleteTable(HBaseConnection.get(hbaseUrl), tableName);
     }
 
-    public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
+    public static void createHTableIfNeeded(HConnection conn, String table, String... families) throws IOException {
         HBaseAdmin hbase = new HBaseAdmin(conn);
 
         try {
-            if (tableExists(conn, tableName)) {
-                logger.debug("HTable '" + tableName + "' already exists");
+            if (tableExists(conn, table)) {
+                logger.debug("HTable '" + table + "' already exists");
+                Set<String> existingFamilies = getFamilyNames(hbase.getTableDescriptor(TableName.valueOf(table)));
+                boolean wait = false;
+                for (String family : families) {
+                    if (existingFamilies.contains(family) == false) {
+                        logger.debug("Adding family '" + family + "' to HTable '" + table + "'");
+                        hbase.addColumn(table, newFamilyDescriptor(family));
+                        // addColumn() is async, is there a way to wait it finish?
+                        wait = true;
+                    }
+                }
+                if (wait) {
+                    try {
+                        Thread.sleep(10000);
+                    } catch (InterruptedException e) {
+                        logger.warn("", e);
+                    }
+                }
                 return;
             }
 
-            logger.debug("Creating HTable '" + tableName + "'");
+            logger.debug("Creating HTable '" + table + "'");
 
-            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table));
 
             if (null != families && families.length > 0) {
                 for (String family : families) {
-                    HColumnDescriptor fd = new HColumnDescriptor(family);
-                    fd.setInMemory(true); // metadata tables are best in memory
+                    HColumnDescriptor fd = newFamilyDescriptor(family);
                     desc.addFamily(fd);
                 }
             }
@@ -254,12 +275,30 @@ public class HBaseConnection {
             desc.setValue(HTABLE_UUID_TAG, UUID.randomUUID().toString());
             hbase.createTable(desc);
 
-            logger.debug("HTable '" + tableName + "' created");
+            logger.debug("HTable '" + table + "' created");
         } finally {
             hbase.close();
         }
     }
 
+    private static Set<String> getFamilyNames(HTableDescriptor desc) {
+        HashSet<String> result = Sets.newHashSet();
+        for (byte[] bytes : desc.getFamiliesKeys()) {
+            try {
+                result.add(new String(bytes, "UTF-8"));
+            } catch (UnsupportedEncodingException e) {
+                logger.error(e.toString());
+            }
+        }
+        return result;
+    }
+
+    private static HColumnDescriptor newFamilyDescriptor(String family) {
+        HColumnDescriptor fd = new HColumnDescriptor(family);
+        fd.setInMemory(true); // metadata tables are best in memory
+        return fd;
+    }
+
     public static void deleteTable(HConnection conn, String tableName) throws IOException {
         HBaseAdmin hbase = new HBaseAdmin(conn);