You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/14 08:43:22 UTC
kylin git commit: KYLIN-1760 auto create HTable and column family
before saving query
Repository: kylin
Updated Branches:
refs/heads/master 9a3c96bfe -> 0a3541254
KYLIN-1760 auto create HTable and column family before saving query
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a354125
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a354125
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a354125
Branch: refs/heads/master
Commit: 0a354125455e8bd34dfa934134543b851f4c09d2
Parents: 9a3c96b
Author: Li Yang <li...@apache.org>
Authored: Tue Jun 14 16:43:11 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Tue Jun 14 16:43:11 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/rest/service/QueryService.java | 8 ++-
.../kylin/storage/hbase/HBaseConnection.java | 57 ++++++++++++++++----
2 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a354125/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 08b338c..3f8b9d7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -45,6 +45,7 @@ import org.apache.calcite.avatica.ColumnMetaData.Rep;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -180,11 +181,14 @@ public class QueryService extends BasicService {
if (null == creator) {
return null;
}
-
+
List<Query> queries = new ArrayList<Query>();
HTableInterface htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ HConnection conn = HBaseConnection.get(hbaseUrl);
+ HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
+
+ htable = conn.getTable(userTableName);
Get get = new Get(Bytes.toBytes(creator));
get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
Result result = htable.get(get);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a354125/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index c1fd1ce..d4dd3ae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -19,7 +19,10 @@
package org.apache.kylin.storage.hbase;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -46,6 +49,8 @@ import org.apache.kylin.engine.mr.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
/**
* @author yangli9
*
@@ -101,7 +106,7 @@ public class HBaseConnection {
tpe.allowCoreThreadTimeOut(true);
logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);
-
+
coprocessorPool = tpe;
return coprocessorPool;
}
@@ -230,23 +235,39 @@ public class HBaseConnection {
deleteTable(HBaseConnection.get(hbaseUrl), tableName);
}
- public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
+ public static void createHTableIfNeeded(HConnection conn, String table, String... families) throws IOException {
HBaseAdmin hbase = new HBaseAdmin(conn);
try {
- if (tableExists(conn, tableName)) {
- logger.debug("HTable '" + tableName + "' already exists");
+ if (tableExists(conn, table)) {
+ logger.debug("HTable '" + table + "' already exists");
+ Set<String> existingFamilies = getFamilyNames(hbase.getTableDescriptor(TableName.valueOf(table)));
+ boolean wait = false;
+ for (String family : families) {
+ if (existingFamilies.contains(family) == false) {
+ logger.debug("Adding family '" + family + "' to HTable '" + table + "'");
+ hbase.addColumn(table, newFamilyDescriptor(family));
+ // addColumn() is async, is there a way to wait it finish?
+ wait = true;
+ }
+ }
+ if (wait) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ logger.warn("", e);
+ }
+ }
return;
}
- logger.debug("Creating HTable '" + tableName + "'");
+ logger.debug("Creating HTable '" + table + "'");
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table));
if (null != families && families.length > 0) {
for (String family : families) {
- HColumnDescriptor fd = new HColumnDescriptor(family);
- fd.setInMemory(true); // metadata tables are best in memory
+ HColumnDescriptor fd = newFamilyDescriptor(family);
desc.addFamily(fd);
}
}
@@ -254,12 +275,30 @@ public class HBaseConnection {
desc.setValue(HTABLE_UUID_TAG, UUID.randomUUID().toString());
hbase.createTable(desc);
- logger.debug("HTable '" + tableName + "' created");
+ logger.debug("HTable '" + table + "' created");
} finally {
hbase.close();
}
}
+ private static Set<String> getFamilyNames(HTableDescriptor desc) {
+ HashSet<String> result = Sets.newHashSet();
+ for (byte[] bytes : desc.getFamiliesKeys()) {
+ try {
+ result.add(new String(bytes, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ logger.error(e.toString());
+ }
+ }
+ return result;
+ }
+
+ private static HColumnDescriptor newFamilyDescriptor(String family) {
+ HColumnDescriptor fd = new HColumnDescriptor(family);
+ fd.setInMemory(true); // metadata tables are best in memory
+ return fd;
+ }
+
public static void deleteTable(HConnection conn, String tableName) throws IOException {
HBaseAdmin hbase = new HBaseAdmin(conn);