You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/02/04 06:16:55 UTC
[33/39] kylin git commit: KYLIN-2374 code review
KYLIN-2374 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5eae37ef
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5eae37ef
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5eae37ef
Branch: refs/heads/spark-it
Commit: 5eae37ef18ca51027c6bb2cfd3410fefc7982f2a
Parents: a2a59c4
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jan 26 09:55:48 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Jan 26 09:55:48 2017 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 3 +-
build/deploy/spark-defaults.conf | 1 -
.../apache/kylin/common/KylinConfigBase.java | 8 --
.../kylin/common/persistence/ResourceStore.java | 3 +
.../org/apache/kylin/cube/model/CubeDesc.java | 2 +-
.../ExtendedColumnMeasureType.java | 8 +-
.../storage/hdfs/ITHDFSResourceStoreTest.java | 36 +++++++-
.../kylin/storage/hbase/HBaseResourceStore.java | 3 +-
.../kylin/storage/hdfs/HDFSResourceStore.java | 90 +++++++++++---------
9 files changed, 97 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index eceb886..43ea17d 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -211,8 +211,9 @@ kylin.engine.spark-conf.spark.executor.memory=4G
kylin.engine.spark-conf.spark.executor.cores=4
kylin.engine.spark-conf.spark.executor.instances=8
kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
-kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.eventLog.enabled=true
kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
## manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
#kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/build/deploy/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf
index 36c0ab3..78a4bc9 100644
--- a/build/deploy/spark-defaults.conf
+++ b/build/deploy/spark-defaults.conf
@@ -1,5 +1,4 @@
spark.yarn.submit.file.replication=1
-spark.eventLog.enabled=true
spark.yarn.max.executor.failures=3
spark.driver.extraJavaOptions=-Dhdp.version=current
spark.yarn.am.extraJavaOptions=-Dhdp.version=current
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5932197..b1acbbf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,14 +193,6 @@ abstract public class KylinConfigBase implements Serializable {
return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
}
- public String getRawHdfsWorkingDirectory() {
- String root = getRequired("kylin.env.hdfs-working-dir");
- if (!root.endsWith("/")) {
- root += "/";
- }
- return root;
- }
-
// ============================================================================
// METADATA
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 25a0801..c441618 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -63,6 +63,9 @@ abstract public class ResourceStore {
public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query";
+
+ protected static final String DEFAULT_STORE_NAME = "kylin_metadata";
+
private static final ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
private static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>();
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 7e599da..5e970bf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -891,7 +891,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
func.init(model);
allColumns.addAll(func.getParameter().getColRefs());
- if (ExtendedColumnMeasureType.FUNC_RAW.equalsIgnoreCase(m.getFunction().getExpression())) {
+ if (ExtendedColumnMeasureType.FUNC_EXTENDED_COLUMN.equalsIgnoreCase(m.getFunction().getExpression())) {
FunctionDesc functionDesc = m.getFunction();
List<TblColRef> hosts = ExtendedColumnMeasureType.getExtendedColumnHosts(functionDesc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
index 1b2cda3..de5ee25 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
@@ -47,8 +47,8 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
private static final Logger logger = LoggerFactory.getLogger(ExtendedColumnMeasureType.class);
- public static final String FUNC_RAW = "EXTENDED_COLUMN";
- public static final String DATATYPE_RAW = "extendedcolumn";
+ public static final String FUNC_EXTENDED_COLUMN = "EXTENDED_COLUMN";
+ public static final String DATATYPE_EXTENDED_COLUMN = "extendedcolumn";
private final DataType dataType;
public static class Factory extends MeasureTypeFactory<ByteArray> {
@@ -60,12 +60,12 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
@Override
public String getAggrFunctionName() {
- return FUNC_RAW;
+ return FUNC_EXTENDED_COLUMN;
}
@Override
public String getAggrDataTypeName() {
- return DATATYPE_RAW;
+ return DATATYPE_EXTENDED_COLUMN;
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ff66048..ec12722 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -18,21 +18,28 @@
package org.apache.kylin.storage.hdfs;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStoreTest;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static junit.framework.TestCase.assertTrue;
+
public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
KylinConfig kylinConfig;
+ FileSystem fs;
@Before
public void setup() throws Exception {
this.createTestMetadata();
kylinConfig = KylinConfig.getInstanceFromEnv();
+ fs = HadoopUtil.getWorkingFileSystem();
}
@After
@@ -41,12 +48,37 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
}
@Test
- public void testResourceStoreBasic() throws Exception {
+ public void testBasic() throws Exception {
+ String oldUrl = kylinConfig.getMetadataUrl();
+ String path = "/kylin/kylin_metadata/metadata";
+ kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
+ HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
+ ResourceStoreTest.testAStore(store);
+ kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+ assertTrue(fs.exists(new Path(path)));
+ }
+
+ @Test
+ public void testQalifiedName() throws Exception {
String oldUrl = kylinConfig.getMetadataUrl();
- kylinConfig.setProperty("kylin.metadata.url", "kylin_metadata@hdfs");
+ String path = "hdfs:///kylin/kylin_metadata/metadata_test1";
+ kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
ResourceStoreTest.testAStore(store);
kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+ assertTrue(fs.exists(new Path(path)));
}
+ @Test
+ public void testFullQalifiedName() throws Exception {
+ String oldUrl = kylinConfig.getMetadataUrl();
+ String path = "hdfs://sandbox.hortonworks.com:8020/kylin/kylin_metadata/metadata_test2";
+ kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
+ HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
+ ResourceStoreTest.testAStore(store);
+ kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+ assertTrue(fs.exists(new Path(path)));
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 0901b54..501f1e4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -59,7 +59,6 @@ public class HBaseResourceStore extends ResourceStore {
private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class);
- private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
private static final String FAMILY = "f";
private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
private static final String COLUMN = "c";
@@ -80,7 +79,7 @@ public class HBaseResourceStore extends ResourceStore {
String metadataUrl = kylinConfig.getMetadataUrl();
// split TABLE@HBASE_URL
int cut = metadataUrl.indexOf('@');
- tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ tableNameBase = cut < 0 ? DEFAULT_STORE_NAME : metadataUrl.substring(0, cut);
hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
if (!hbaseUrl.equals("hbase"))
throw new IOException("Can not create HBaseResourceStore. Url not match. Url:" + hbaseUrl);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index 38acfb0..d24d3b4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -46,11 +46,7 @@ public class HDFSResourceStore extends ResourceStore {
private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
- private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 10;
-
- private static final String DEFAULT_FOLDER_NAME = "kylin_metadata";
-
- private static final String DEFAULT_METADATA_FOLDER_NAME = "hdfs_metadata";
+ private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 2;
private Path hdfsMetaPath;
@@ -62,42 +58,43 @@ public class HDFSResourceStore extends ResourceStore {
super(kylinConfig);
String metadataUrl = kylinConfig.getMetadataUrl();
int cut = metadataUrl.indexOf('@');
- String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut);
- String hdfsUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
- if (!hdfsUrl.equals("hdfs"))
- throw new IOException("Can not create HDFSResourceStore. Url not match. Url:" + hdfsUrl);
- metaDirName += "/" + DEFAULT_METADATA_FOLDER_NAME;
- logger.info("meta dir name :" + metaDirName);
- createMetaFolder(metaDirName, kylinConfig);
- }
-
- private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception {
- String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
- fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
- logger.info("hdfs working dir : " + hdfsWorkingDir);
- Path hdfsWorkingPath = new Path(hdfsWorkingDir);
- if (!fs.exists(hdfsWorkingPath)) {
- throw new IOException("HDFS working dir not exist");
+ if (cut < 0) {
+ throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore: " + metadataUrl);
}
+ String suffix = metadataUrl.substring(cut + 1);
+ if (!suffix.equals("hdfs"))
+ throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + suffix);
+
+ String path = metadataUrl.substring(0, cut);
+ fs = HadoopUtil.getFileSystem(path);
+ Path metadataPath = new Path(path);
//creat lock manager
- this.lockManager = new LockManager(kylinConfig, kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+ this.lockManager = new LockManager(kylinConfig, getRelativePath(metadataPath));
+ if (fs.exists(metadataPath) == false) {
+ logger.warn("Path not exist in HDFS, create it: " + path);
+ createMetaFolder(metadataPath, kylinConfig);
+ }
+
+ hdfsMetaPath = metadataPath;
+ logger.info("hdfs meta path : " + hdfsMetaPath.toString());
+
+ }
+
+
+
+ private void createMetaFolder(Path metaDirName, KylinConfig kylinConfig) throws Exception {
//create hdfs meta path
- hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
- if (!fs.exists(hdfsMetaPath)) {
- ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/"));
- try {
- if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES)) {
- logger.info("get root lock successfully");
- if (!fs.exists(hdfsMetaPath)) {
- fs.mkdirs(hdfsMetaPath);
- logger.info("create hdfs meta path");
- }
+ ResourceLock lock = lockManager.getLock(getRelativePath(metaDirName));
+ try {
+ if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+ if (!fs.exists(metaDirName)) {
+ fs.mkdirs(metaDirName);
}
- } finally {
- lockManager.releaseLock(lock);
}
+ } finally {
+ lockManager.releaseLock(lock);
}
- logger.info("hdfs meta path : " + hdfsMetaPath.toString());
+ logger.info("hdfs meta path created: " + metaDirName.toString());
}
@Override
@@ -170,7 +167,7 @@ public class HDFSResourceStore extends ResourceStore {
ResourceLock lock = null;
try {
lock = lockManager.getLock(resPath);
- lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+ lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
in = fs.open(p);
long t = in.readLong();
return t;
@@ -192,7 +189,7 @@ public class HDFSResourceStore extends ResourceStore {
ResourceLock lock = null;
try {
lock = lockManager.getLock(resPath);
- lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+ lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
out = fs.create(p, true);
out.writeLong(ts);
IOUtils.copy(content, out);
@@ -228,7 +225,7 @@ public class HDFSResourceStore extends ResourceStore {
ResourceLock lock = null;
try {
lock = lockManager.getLock(resPath);
- lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+ lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
Path p = getRealHDFSPath(resPath);
if (fs.exists(p)) {
fs.delete(p, true);
@@ -253,4 +250,21 @@ public class HDFSResourceStore extends ResourceStore {
return new Path(this.hdfsMetaPath, resourcePath);
}
+ private static String getRelativePath(Path hdfsPath) {
+ String path = hdfsPath.toString();
+ int index = path.indexOf("://");
+ if (index > 0) {
+ path = path.substring(index + 3);
+ }
+
+ if (path.startsWith("/") == false) {
+ if (path.indexOf("/") > 0) {
+ path = path.substring(path.indexOf("/"));
+ } else {
+ path = "/" + path;
+ }
+ }
+ return path;
+ }
+
}