You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:37 UTC
[49/67] [abbrv] kylin git commit: KYLIN-2535 Use ResourceStore to
manage ACL and saved queries
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 61ddbb0..5130e55 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -20,6 +20,8 @@ package org.apache.kylin.rest.service;
import static org.apache.kylin.common.util.CheckUtil.checkCondition;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -31,7 +33,6 @@ import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -46,21 +47,17 @@ import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import org.apache.calcite.avatica.ColumnMetaData.Rep;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.DBUtils;
+import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -92,9 +89,7 @@ import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.util.AclUtil;
import org.apache.kylin.rest.util.AdHocUtil;
-import org.apache.kylin.rest.util.Serializer;
import org.apache.kylin.rest.util.TableauInterceptor;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +100,7 @@ import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.CharMatcher;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -123,18 +119,12 @@ public class QueryService extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
- public static final String USER_QUERY_FAMILY = "q";
- private static final String USER_TABLE_NAME = "_user";
- private static final String USER_QUERY_COLUMN = "c";
-
public static final String SUCCESS_QUERY_CACHE = "StorageCache";
public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";
+ public static final String QUERY_STORE_PATH_PREFIX = "/query/";
- private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class);
- protected final BadQueryDetector badQueryDetector = new BadQueryDetector();
-
- private final StorageURL hbaseUrl;
- private final String userTableName;
+ final BadQueryDetector badQueryDetector = new BadQueryDetector();
+ final ResourceStore queryStore;
@Autowired
protected CacheManager cacheManager;
@@ -156,10 +146,7 @@ public class QueryService extends BasicService {
}
public QueryService() {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- hbaseUrl = kylinConfig.getMetadataUrl();
- userTableName = hbaseUrl.getIdentifier() + USER_TABLE_NAME;
-
+ queryStore = ResourceStore.getStore(getConfig());
badQueryDetector.start();
}
@@ -183,18 +170,10 @@ public class QueryService extends BasicService {
List<Query> queries = getQueries(creator);
queries.add(query);
Query[] queryArray = new Query[queries.size()];
-
- byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- Table htable = null;
- try {
- htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
- Put put = new Put(Bytes.toBytes(creator));
- put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
-
- htable.put(put);
- } finally {
- IOUtils.closeQuietly(htable);
- }
+ QueryRecord record = new QueryRecord(queries.toArray(queryArray));
+ queryStore.deleteResource(getQueryKeyById(creator));
+ queryStore.putResource(getQueryKeyById(creator), record, 0, QueryRecordSerializer.getInstance());
+ return;
}
public void removeQuery(final String creator, final String id) throws IOException {
@@ -214,45 +193,24 @@ public class QueryService extends BasicService {
if (!changed) {
return;
}
-
Query[] queryArray = new Query[queries.size()];
- byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- Table htable = null;
- try {
- htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
- Put put = new Put(Bytes.toBytes(creator));
- put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
-
- htable.put(put);
- } finally {
- IOUtils.closeQuietly(htable);
- }
+ QueryRecord record = new QueryRecord(queries.toArray(queryArray));
+ queryStore.deleteResource(getQueryKeyById(creator));
+ queryStore.putResource(getQueryKeyById(creator), record, 0, QueryRecordSerializer.getInstance());
+ return;
}
public List<Query> getQueries(final String creator) throws IOException {
if (null == creator) {
return null;
}
-
List<Query> queries = new ArrayList<Query>();
- Table htable = null;
- try {
- org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl);
- HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
-
- htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
- Get get = new Get(Bytes.toBytes(creator));
- get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
- Result result = htable.get(get);
- Query[] query = querySerializer.deserialize(result.getValue(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN)));
-
- if (null != query) {
- queries.addAll(Arrays.asList(query));
+ QueryRecord record = queryStore.getResource(getQueryKeyById(creator), QueryRecord.class, QueryRecordSerializer.getInstance());
+ if (record != null) {
+ for (Query query : record.getQueries()) {
+ queries.add(query);
}
- } finally {
- IOUtils.closeQuietly(htable);
}
-
return queries;
}
@@ -892,4 +850,58 @@ public class QueryService extends BasicService {
public void setCacheManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
+
+ private static String getQueryKeyById(String creator) {
+ return QUERY_STORE_PATH_PREFIX + creator;
+ }
+
+ private static class QueryRecordSerializer implements Serializer<QueryRecord> {
+
+ private static final QueryRecordSerializer serializer = new QueryRecordSerializer();
+
+ QueryRecordSerializer() {
+
+ }
+
+ public static QueryRecordSerializer getInstance() {
+ return serializer;
+ }
+
+ @Override
+ public void serialize(QueryRecord record, DataOutputStream out) throws IOException {
+ String jsonStr = JsonUtil.writeValueAsString(record);
+ out.writeUTF(jsonStr);
+ }
+
+ @Override
+ public QueryRecord deserialize(DataInputStream in) throws IOException {
+ String jsonStr = in.readUTF();
+ return JsonUtil.readValue(jsonStr, QueryRecord.class);
+ }
+ }
+
+}
+
+@SuppressWarnings("serial")
+class QueryRecord extends RootPersistentEntity {
+
+ @JsonProperty()
+ private Query[] queries;
+
+ public QueryRecord() {
+
+ }
+
+ public QueryRecord(Query[] queries) {
+ this.queries = queries;
+ }
+
+ public Query[] getQueries() {
+ return queries;
+ }
+
+ public void setQueries(Query[] queries) {
+ this.queries = queries;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/server/src/main/resources/applicationContext.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml
index 100b202..8416f25 100644
--- a/server/src/main/resources/applicationContext.xml
+++ b/server/src/main/resources/applicationContext.xml
@@ -112,11 +112,4 @@
p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
</beans>
- <!-- hbase storage/global lock Config -->
- <beans profile="ldap,saml">
- <bean id="aclHBaseStorage" class="org.apache.kylin.rest.security.RealAclHBaseStorage"/>
- </beans>
- <beans profile="testing">
- <bean id="aclHBaseStorage" class="org.apache.kylin.rest.security.MockAclHBaseStorage"/>
- </beans>
</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 81349ef..615c845 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -67,10 +67,15 @@ public class HBaseResourceStore extends ResourceStore {
private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class);
private static final String FAMILY = "f";
+
private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
+
private static final String COLUMN = "c";
+
private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
+
private static final String COLUMN_TS = "t";
+
private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
final String tableName;
@@ -82,10 +87,9 @@ public class HBaseResourceStore extends ResourceStore {
public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
super(kylinConfig);
-
metadataUrl = buildMetadataUrl(kylinConfig);
tableName = metadataUrl.getIdentifier();
- createHTableIfNeeded(getAllInOneTableName());
+ createHTableIfNeeded(tableName);
}
private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException {
@@ -107,10 +111,6 @@ public class HBaseResourceStore extends ResourceStore {
HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
}
- private String getAllInOneTableName() {
- return tableName;
- }
-
@Override
protected boolean existsImpl(String resPath) throws IOException {
Result r = getFromHTable(resPath, false, false);
@@ -164,7 +164,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] endRow = Bytes.toBytes(lookForPrefix);
endRow[endRow.length - 1]++;
- Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
Scan scan = new Scan(startRow, endRow);
if ((filter != null && filter instanceof KeyOnlyFilter) == false) {
scan.addColumn(B_FAMILY, B_COLUMN_TS);
@@ -288,7 +288,7 @@ public class HBaseResourceStore extends ResourceStore {
IOUtils.copy(content, bout);
bout.close();
- Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
try {
byte[] row = Bytes.toBytes(resPath);
Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
@@ -302,7 +302,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
throws IOException, IllegalStateException {
- Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
try {
byte[] row = Bytes.toBytes(resPath);
byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -325,7 +325,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected void deleteResourceImpl(String resPath) throws IOException {
- Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
try {
boolean hdfsResourceExist = false;
Result result = internalGetFromHTable(table, resPath, true, false);
@@ -354,11 +354,11 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected String getReadableResourcePathImpl(String resPath) {
- return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+ return tableName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
}
private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
- Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
try {
return internalGetFromHTable(table, path, fetchContent, fetchTimestamp);
} finally {
@@ -429,6 +429,6 @@ public class HBaseResourceStore extends ResourceStore {
@Override
public String toString() {
- return getAllInOneTableName() + "@hbase";
+ return tableName + "@hbase";
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 991a750..6e7890b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -30,7 +30,7 @@ import org.apache.kylin.job.lock.JobLock;
public class ZookeeperJobLock implements DistributedLock, JobLock {
private ZookeeperDistributedLock lock = (ZookeeperDistributedLock) new ZookeeperDistributedLock.Factory().lockForCurrentProcess();
-
+
@Override
public String getClient() {
return lock.getClient();
@@ -60,7 +60,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock {
public boolean isLockedByMe(String lockPath) {
return lock.isLockedByMe(lockPath);
}
-
+
@Override
public void unlock(String lockPath) {
lock.unlock(lockPath);
@@ -70,6 +70,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock {
public void purgeLocks(String lockPathRoot) {
lock.purgeLocks(lockPathRoot);
}
+
@Override
public Closeable watchLocks(String lockPathRoot, Executor executor, Watcher watcher) {
return lock.watchLocks(lockPathRoot, executor, watcher);
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
index b5ebe89..20569d3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
@@ -18,35 +18,17 @@
package org.apache.kylin.storage.hbase.util;
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import org.apache.kylin.common.KylinConfig;
public class ZookeeperUtil {
+ public static String ZOOKEEPER_UTIL_HBASE_CLASSNAME = "org.apache.kylin.storage.hbase.util.ZooKeeperUtilHbase";
+
/**
- * Get zookeeper connection string from HBase Configuration
- *
- * @return Zookeeper Connection string
+ * Get zookeeper connection string from HBase Configuration or from kylin.properties
*/
public static String getZKConnectString() {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
- final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
- @Nullable
- @Override
- public String apply(String input) {
- return input + ":" + port;
- }
- }), ",");
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ return config.getZookeeperConnectString();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index fe1ad4e..d185f4e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -40,6 +40,7 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class HDFSResourceStore extends ResourceStore {
@@ -50,13 +51,13 @@ public class HDFSResourceStore extends ResourceStore {
private FileSystem fs;
+ private static final String HDFS_SCHEME = "hdfs";
+
public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
super(kylinConfig);
StorageURL metadataUrl = kylinConfig.getMetadataUrl();
+ Preconditions.checkState(HDFS_SCHEME.equals(metadataUrl.getScheme()));
- if (!metadataUrl.getScheme().equals("hdfs"))
- throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + metadataUrl);
-
String path = metadataUrl.getIdentifier();
fs = HadoopUtil.getFileSystem(path);
Path metadataPath = new Path(path);
http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
index e1f994f..0fdc740 100644
--- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
@@ -20,6 +20,7 @@ package org.apache.kylin.tool;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -31,6 +32,7 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
@@ -159,11 +161,13 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor {
public void run() {
logger.info("Start to extract HBase usage.");
try {
+ // use reflection to isolate NoClassDef errors when HBase is not available
String[] hbaseArgs = { "-destDir", new File(exportDir, "hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-submodule", "true" };
- HBaseUsageExtractor hBaseUsageExtractor = new HBaseUsageExtractor();
logger.info("HBaseUsageExtractor args: " + Arrays.toString(hbaseArgs));
- hBaseUsageExtractor.execute(hbaseArgs);
- } catch (Exception e) {
+ Object extractor = ClassUtil.newInstance("org.apache.kylin.tool.HBaseUsageExtractor");
+ Method execute = extractor.getClass().getDeclaredMethod("execute", String[].class);
+ execute.invoke(extractor, (Object) hbaseArgs);
+ } catch (Throwable e) {
logger.error("Error in export HBase usage.", e);
}
}