You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:44:01 UTC
[32/47] kylin git commit: KYLIN-2374 Allow kylin to store metadata in
HDFS instead of HBase
KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d23bf930
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d23bf930
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d23bf930
Branch: refs/heads/KYLIN-2361
Commit: d23bf930da0b542d0e6981917e6bde055839577a
Parents: db85d66
Author: xiefan46 <95...@qq.com>
Authored: Wed Jan 11 10:00:19 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 23 16:23:56 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 5 +
.../common/persistence/ResourceStoreTest.java | 3 +-
.../test_case_data/sandbox/kylin.properties | 1 +
.../storage/hdfs/ITHDFSResourceStoreTest.java | 117 +++++++++++
.../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ++++
.../kylin/storage/hdfs/HDFSLockManager.java | 45 +++++
.../kylin/storage/hdfs/HDFSResourceStore.java | 198 +++++++++++++++++++
7 files changed, 409 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 05df177..44d636d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -201,6 +201,11 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.metadata.url");
}
+ //for hdfs resource store
+ public String getHDFSMetadataUrl() {
+ return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
+ }
+
// for test only
public void setMetadataUrl(String metadataUrl) {
setProperty("kylin.metadata.url", metadataUrl);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 4c31a15..ddaf481 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -110,9 +110,10 @@ public class ResourceStoreTest {
}
// list
- NavigableSet<String> list;
+ NavigableSet<String> list = null;
list = store.listResources(dir1);
+ System.out.println(list);
assertTrue(list.contains(path1));
assertTrue(list.contains(path2) == false);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 6c512dc..b01c377 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -41,6 +41,7 @@ kylin.source.hive.client=cli
# The metadata store in hbase
kylin.metadata.url=kylin_default_instance@hbase
+
# The storage for final cube file in hbase
kylin.storage.url=hbase
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
new file mode 100644
index 0000000..ef04957
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
+
+ KylinConfig kylinConfig;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Ignore
+ @Test
+ public void testHDFSUrl() throws Exception {
+ assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
+ System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
+ }
+
+
+ @Ignore
+ @Test
+ public void testMultiThreadWriteHDFS() throws Exception{
+ //System.out.println(kylinConfig.getHdfsWorkingDirectory());
+ final Path testDir = new Path("hdfs:///test123");
+ final FileSystem fs = HadoopUtil.getFileSystem(testDir);
+ final String fileName = "test.json";
+ int threadNum = 3;
+ ExecutorService service = Executors.newFixedThreadPool(threadNum);
+ final CountDownLatch latch = new CountDownLatch(threadNum);
+ Path p = new Path(testDir,fileName);
+ fs.deleteOnExit(p);
+ fs.createNewFile(p);
+ for(int i=0;i<threadNum;i++) {
+ service.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long id = Thread.currentThread().getId();
+ Path p = new Path(testDir, fileName);
+ /*while(fs.exists(p)){
+ System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+ Thread.currentThread().sleep(1000);
+ }*/
+ while(!fs.createNewFile(p)){
+ System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+ Thread.currentThread().sleep(1000);
+ }
+ System.out.println("Thread id : " + id + " get lock, sleep a while");
+ Thread.currentThread().sleep(1000);
+ fs.delete(p,true);
+ System.out.println("Thread id : " + id + " release lock");
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ Thread.currentThread().sleep(1000);
+ fs.delete(p,true);
+ System.out.println("main thread release lock.Waiting threads down");
+ System.out.println("file still exist : " + fs.exists(p));
+ latch.await();
+ }
+
+ @Test
+ public void testHDFSStore() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ ResourceStore store = new HDFSResourceStore(config);
+ ResourceStoreTest.testAStore(store);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
new file mode 100644
index 0000000..8710edf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLock {
+
+ private Path rawLock;
+
+ private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
+
+ protected HDFSLock(String resourceFullPath) {
+ this.rawLock = new Path(resourceFullPath);
+ }
+
+ public boolean init(FileSystem fs) throws IOException, InterruptedException {
+ if (!fs.isFile(rawLock)) {
+ logger.info("Not support directory lock yet");
+ return false;
+ }
+ while (!fs.createNewFile(rawLock)) {
+ Thread.currentThread().sleep(1000);
+ }
+ return true;
+ }
+
+ public boolean release(FileSystem fs) throws IOException, InterruptedException {
+ while (!fs.delete(rawLock, false)) {
+ Thread.currentThread().sleep(1000);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
new file mode 100644
index 0000000..1cd0800
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLockManager {
+
+ private static final String LOCK_HOME = "LOCK_HOME";
+
+ private Path lockPath;
+
+ private FileSystem fs;
+
+ public HDFSLockManager(String hdfsWorkingDir) throws IOException{
+ this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
+ this.fs = HadoopUtil.getFileSystem(lockPath);
+ if(!fs.exists(lockPath)){
+ fs.create(lockPath);
+ }
+ }
+
+ public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
+ HDFSLock lock = new HDFSLock(resourceFullPath);
+ boolean success = lock.init(fs);
+ if(success){
+ return lock;
+ }else{
+ throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
+ }
+ }
+
+ public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
+ boolean success = lock.release(fs);
+ if(!success)
+ throw new IllegalStateException("Release lock fail");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
new file mode 100644
index 0000000..c7f0f25
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class HDFSResourceStore extends ResourceStore {
+
+ private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+
+ private Path hdfsMetaPath;
+
+ private FileSystem fs;
+
+ private HDFSLockManager lockManager;
+
+ private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+
+ //public for test. Normal should be protected
+ public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+ super(kylinConfig);
+ String metadataUrl = kylinConfig.getHDFSMetadataUrl();
+ // split TABLE@HBASE_URL
+ int cut = metadataUrl.indexOf('@');
+ String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ createMetaFolder(metaDirName, kylinConfig);
+ }
+
+ private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+ String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
+ fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+ Path hdfsWorkingPath = new Path(hdfsWorkingDir);
+ if (!fs.exists(hdfsWorkingPath)) {
+ throw new IOException("HDFS working dir not exist");
+ }
+ hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
+ if (!fs.exists(hdfsMetaPath)) {
+ fs.create(hdfsMetaPath, true);
+ }
+ lockManager = new HDFSLockManager(hdfsWorkingDir);
+ }
+
+ @Override
+ protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
+ Path p = getRealHDFSPath(folderPath);
+ if (!fs.exists(p) || !fs.isDirectory(p)) {
+ return null;
+ }
+ TreeSet<String> r = new TreeSet<>();
+ FileStatus[] statuses = fs.listStatus(p);
+ String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+ for (FileStatus status : statuses) {
+ r.add(prefix + status.getPath().getName());
+ }
+ return r;
+ }
+
+ @Override
+ protected boolean existsImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ return fs.exists(p) && fs.isFile(p);
+ }
+
+ @Override
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
+ NavigableSet<String> resources = listResources(folderPath);
+ if (resources == null)
+ return Collections.emptyList();
+ List<RawResource> result = Lists.newArrayListWithCapacity(resources.size());
+ try {
+ for (String res : resources) {
+ long ts = getResourceTimestampImpl(res);
+ if (timeStart <= ts && ts < timeEndExclusive) {
+ RawResource resource = getResourceImpl(res);
+ if (resource != null) // can be null if is a sub-folder
+ result.add(resource);
+ }
+ }
+ } catch (IOException ex) {
+ for (RawResource rawResource : result) {
+ IOUtils.closeQuietly(rawResource.inputStream);
+ }
+ throw ex;
+ }
+ return result;
+ }
+
+ @Override
+ protected RawResource getResourceImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p) && fs.isFile(p)) {
+ if (fs.getFileStatus(p).getLen() == 0) {
+ logger.warn("Zero length file: " + p.toString());
+ }
+ FSDataInputStream in = fs.open(p);
+ return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected long getResourceTimestampImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ if (!fs.exists(p) || !fs.isFile(p)) {
+ return 0;
+ }
+ FileStatus status = fs.getFileStatus(p);
+ return status.getModificationTime();
+ }
+
+ @Override
+ protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(p, true);
+ IOUtils.copy(content, out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ @Override
+ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+ Path p = getRealHDFSPath(resPath);
+ if (!fs.exists(p)) {
+ if (oldTS != 0) {
+ throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+ }
+
+ } else {
+ long realLastModify = getResourceTimestamp(resPath);
+ if (realLastModify != oldTS) {
+ throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
+ }
+ }
+ putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
+ return newTS;
+ }
+
+ @Override
+ protected void deleteResourceImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ }
+
+ @Override
+ protected String getReadableResourcePathImpl(String resPath) {
+ return getRealHDFSPath(resPath).toString();
+ }
+
+ private Path getRealHDFSPath(String resourcePath) {
+ return new Path(this.hdfsMetaPath, resourcePath);
+ }
+
+}