You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:37 UTC

[49/67] [abbrv] kylin git commit: KYLIN-2535 Use ResourceStore to manage ACL and saved queries

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 61ddbb0..5130e55 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -20,6 +20,8 @@ package org.apache.kylin.rest.service;
 
 import static org.apache.kylin.common.util.CheckUtil.checkCondition;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -31,7 +33,6 @@ import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -46,21 +47,17 @@ import javax.annotation.PostConstruct;
 import javax.sql.DataSource;
 
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.DBUtils;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -92,9 +89,7 @@ import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.util.AclUtil;
 import org.apache.kylin.rest.util.AdHocUtil;
-import org.apache.kylin.rest.util.Serializer;
 import org.apache.kylin.rest.util.TableauInterceptor;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,6 +100,7 @@ import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -123,18 +119,12 @@ public class QueryService extends BasicService {
 
     private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
 
-    public static final String USER_QUERY_FAMILY = "q";
-    private static final String USER_TABLE_NAME = "_user";
-    private static final String USER_QUERY_COLUMN = "c";
-
     public static final String SUCCESS_QUERY_CACHE = "StorageCache";
     public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";
+    public static final String QUERY_STORE_PATH_PREFIX = "/query/";
 
-    private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class);
-    protected final BadQueryDetector badQueryDetector = new BadQueryDetector();
-
-    private final StorageURL hbaseUrl;
-    private final String userTableName;
+    final BadQueryDetector badQueryDetector = new BadQueryDetector();
+    final ResourceStore queryStore;
 
     @Autowired
     protected CacheManager cacheManager;
@@ -156,10 +146,7 @@ public class QueryService extends BasicService {
     }
 
     public QueryService() {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        hbaseUrl = kylinConfig.getMetadataUrl();
-        userTableName = hbaseUrl.getIdentifier() + USER_TABLE_NAME;
-
+        queryStore = ResourceStore.getStore(getConfig());
         badQueryDetector.start();
     }
 
@@ -183,18 +170,10 @@ public class QueryService extends BasicService {
         List<Query> queries = getQueries(creator);
         queries.add(query);
         Query[] queryArray = new Query[queries.size()];
-
-        byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        Table htable = null;
-        try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
-            Put put = new Put(Bytes.toBytes(creator));
-            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
-
-            htable.put(put);
-        } finally {
-            IOUtils.closeQuietly(htable);
-        }
+        QueryRecord record = new QueryRecord(queries.toArray(queryArray));
+        queryStore.deleteResource(getQueryKeyById(creator));
+        queryStore.putResource(getQueryKeyById(creator), record, 0, QueryRecordSerializer.getInstance());
+        return;
     }
 
     public void removeQuery(final String creator, final String id) throws IOException {
@@ -214,45 +193,24 @@ public class QueryService extends BasicService {
         if (!changed) {
             return;
         }
-
         Query[] queryArray = new Query[queries.size()];
-        byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        Table htable = null;
-        try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
-            Put put = new Put(Bytes.toBytes(creator));
-            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
-
-            htable.put(put);
-        } finally {
-            IOUtils.closeQuietly(htable);
-        }
+        QueryRecord record = new QueryRecord(queries.toArray(queryArray));
+        queryStore.deleteResource(getQueryKeyById(creator));
+        queryStore.putResource(getQueryKeyById(creator), record, 0, QueryRecordSerializer.getInstance());
+        return;
     }
 
     public List<Query> getQueries(final String creator) throws IOException {
         if (null == creator) {
             return null;
         }
-
         List<Query> queries = new ArrayList<Query>();
-        Table htable = null;
-        try {
-            org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl);
-            HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
-
-            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
-            Get get = new Get(Bytes.toBytes(creator));
-            get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
-            Result result = htable.get(get);
-            Query[] query = querySerializer.deserialize(result.getValue(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN)));
-
-            if (null != query) {
-                queries.addAll(Arrays.asList(query));
+        QueryRecord record = queryStore.getResource(getQueryKeyById(creator), QueryRecord.class, QueryRecordSerializer.getInstance());
+        if (record != null) {
+            for (Query query : record.getQueries()) {
+                queries.add(query);
             }
-        } finally {
-            IOUtils.closeQuietly(htable);
         }
-
         return queries;
     }
 
@@ -892,4 +850,58 @@ public class QueryService extends BasicService {
     public void setCacheManager(CacheManager cacheManager) {
         this.cacheManager = cacheManager;
     }
+
+    private static String getQueryKeyById(String creator) {
+        return QUERY_STORE_PATH_PREFIX + creator;
+    }
+
+    private static class QueryRecordSerializer implements Serializer<QueryRecord> {
+
+        private static final QueryRecordSerializer serializer = new QueryRecordSerializer();
+
+        QueryRecordSerializer() {
+
+        }
+
+        public static QueryRecordSerializer getInstance() {
+            return serializer;
+        }
+
+        @Override
+        public void serialize(QueryRecord record, DataOutputStream out) throws IOException {
+            String jsonStr = JsonUtil.writeValueAsString(record);
+            out.writeUTF(jsonStr);
+        }
+
+        @Override
+        public QueryRecord deserialize(DataInputStream in) throws IOException {
+            String jsonStr = in.readUTF();
+            return JsonUtil.readValue(jsonStr, QueryRecord.class);
+        }
+    }
+
+}
+
+@SuppressWarnings("serial")
+class QueryRecord extends RootPersistentEntity {
+
+    @JsonProperty()
+    private Query[] queries;
+
+    public QueryRecord() {
+
+    }
+
+    public QueryRecord(Query[] queries) {
+        this.queries = queries;
+    }
+
+    public Query[] getQueries() {
+        return queries;
+    }
+
+    public void setQueries(Query[] queries) {
+        this.queries = queries;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/server/src/main/resources/applicationContext.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml
index 100b202..8416f25 100644
--- a/server/src/main/resources/applicationContext.xml
+++ b/server/src/main/resources/applicationContext.xml
@@ -112,11 +112,4 @@
               p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
     </beans>
 
-    <!-- hbase storage/global lock Config -->
-    <beans profile="ldap,saml">
-        <bean id="aclHBaseStorage" class="org.apache.kylin.rest.security.RealAclHBaseStorage"/>
-    </beans>
-    <beans profile="testing">
-        <bean id="aclHBaseStorage" class="org.apache.kylin.rest.security.MockAclHBaseStorage"/>
-    </beans>
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 81349ef..615c845 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -67,10 +67,15 @@ public class HBaseResourceStore extends ResourceStore {
     private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class);
 
     private static final String FAMILY = "f";
+
     private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
+
     private static final String COLUMN = "c";
+
     private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
+
     private static final String COLUMN_TS = "t";
+
     private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
 
     final String tableName;
@@ -82,10 +87,9 @@ public class HBaseResourceStore extends ResourceStore {
 
     public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
         super(kylinConfig);
-
         metadataUrl = buildMetadataUrl(kylinConfig);
         tableName = metadataUrl.getIdentifier();
-        createHTableIfNeeded(getAllInOneTableName());
+        createHTableIfNeeded(tableName);
     }
 
     private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException {
@@ -107,10 +111,6 @@ public class HBaseResourceStore extends ResourceStore {
         HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
     }
 
-    private String getAllInOneTableName() {
-        return tableName;
-    }
-
     @Override
     protected boolean existsImpl(String resPath) throws IOException {
         Result r = getFromHTable(resPath, false, false);
@@ -164,7 +164,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] endRow = Bytes.toBytes(lookForPrefix);
         endRow[endRow.length - 1]++;
 
-        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
         Scan scan = new Scan(startRow, endRow);
         if ((filter != null && filter instanceof KeyOnlyFilter) == false) {
             scan.addColumn(B_FAMILY, B_COLUMN_TS);
@@ -288,7 +288,7 @@ public class HBaseResourceStore extends ResourceStore {
         IOUtils.copy(content, bout);
         bout.close();
 
-        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
         try {
             byte[] row = Bytes.toBytes(resPath);
             Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
@@ -302,7 +302,7 @@ public class HBaseResourceStore extends ResourceStore {
     @Override
     protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
             throws IOException, IllegalStateException {
-        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
         try {
             byte[] row = Bytes.toBytes(resPath);
             byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -325,7 +325,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
         try {
             boolean hdfsResourceExist = false;
             Result result = internalGetFromHTable(table, resPath, true, false);
@@ -354,11 +354,11 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected String getReadableResourcePathImpl(String resPath) {
-        return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+        return tableName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
     }
 
     private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
-        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
         try {
             return internalGetFromHTable(table, path, fetchContent, fetchTimestamp);
         } finally {
@@ -429,6 +429,6 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     public String toString() {
-        return getAllInOneTableName() + "@hbase";
+        return tableName + "@hbase";
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 991a750..6e7890b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -30,7 +30,7 @@ import org.apache.kylin.job.lock.JobLock;
 public class ZookeeperJobLock implements DistributedLock, JobLock {
 
     private ZookeeperDistributedLock lock = (ZookeeperDistributedLock) new ZookeeperDistributedLock.Factory().lockForCurrentProcess();
-    
+
     @Override
     public String getClient() {
         return lock.getClient();
@@ -60,7 +60,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock {
     public boolean isLockedByMe(String lockPath) {
         return lock.isLockedByMe(lockPath);
     }
-    
+
     @Override
     public void unlock(String lockPath) {
         lock.unlock(lockPath);
@@ -70,6 +70,7 @@ public class ZookeeperJobLock implements DistributedLock, JobLock {
     public void purgeLocks(String lockPathRoot) {
         lock.purgeLocks(lockPathRoot);
     }
+
     @Override
     public Closeable watchLocks(String lockPathRoot, Executor executor, Watcher watcher) {
         return lock.watchLocks(lockPathRoot, executor, watcher);

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
index b5ebe89..20569d3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
@@ -18,35 +18,17 @@
 
 package org.apache.kylin.storage.hbase.util;
 
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import org.apache.kylin.common.KylinConfig;
 
 public class ZookeeperUtil {
 
+    public static String ZOOKEEPER_UTIL_HBASE_CLASSNAME = "org.apache.kylin.storage.hbase.util.ZooKeeperUtilHbase";
+
     /**
-     * Get zookeeper connection string from HBase Configuration
-     *
-     * @return Zookeeper Connection string
+     * Get zookeeper connection string from HBase Configuration or from kylin.properties
      */
     public static String getZKConnectString() {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
-        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
-            @Nullable
-            @Override
-            public String apply(String input) {
-                return input + ":" + port;
-            }
-        }), ",");
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        return config.getZookeeperConnectString();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index fe1ad4e..d185f4e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -40,6 +40,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class HDFSResourceStore extends ResourceStore {
@@ -50,13 +51,13 @@ public class HDFSResourceStore extends ResourceStore {
 
     private FileSystem fs;
 
+    private static final String HDFS_SCHEME = "hdfs";
+
     public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
         super(kylinConfig);
         StorageURL metadataUrl = kylinConfig.getMetadataUrl();
+        Preconditions.checkState(HDFS_SCHEME.equals(metadataUrl.getScheme()));
         
-        if (!metadataUrl.getScheme().equals("hdfs"))
-            throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + metadataUrl);
-
         String path = metadataUrl.getIdentifier();
         fs = HadoopUtil.getFileSystem(path);
         Path metadataPath = new Path(path);

http://git-wip-us.apache.org/repos/asf/kylin/blob/afaa95a0/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
index e1f994f..0fdc740 100644
--- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
@@ -20,6 +20,7 @@ package org.apache.kylin.tool;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -31,6 +32,7 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -159,11 +161,13 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor {
                 public void run() {
                     logger.info("Start to extract HBase usage.");
                     try {
+                        // use reflection to isolate NoClassDef errors when HBase is not available
                         String[] hbaseArgs = { "-destDir", new File(exportDir, "hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-submodule", "true" };
-                        HBaseUsageExtractor hBaseUsageExtractor = new HBaseUsageExtractor();
                         logger.info("HBaseUsageExtractor args: " + Arrays.toString(hbaseArgs));
-                        hBaseUsageExtractor.execute(hbaseArgs);
-                    } catch (Exception e) {
+                        Object extractor = ClassUtil.newInstance("org.apache.kylin.tool.HBaseUsageExtractor");
+                        Method execute = extractor.getClass().getDeclaredMethod("execute", String[].class);
+                        execute.invoke(extractor, (Object) hbaseArgs);
+                    } catch (Throwable e) {
                         logger.error("Error in export HBase usage.", e);
                     }
                 }