You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/23 08:24:10 UTC

[3/6] kylin git commit: KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase

KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d23bf930
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d23bf930
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d23bf930

Branch: refs/heads/master
Commit: d23bf930da0b542d0e6981917e6bde055839577a
Parents: db85d66
Author: xiefan46 <95...@qq.com>
Authored: Wed Jan 11 10:00:19 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 23 16:23:56 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   5 +
 .../common/persistence/ResourceStoreTest.java   |   3 +-
 .../test_case_data/sandbox/kylin.properties     |   1 +
 .../storage/hdfs/ITHDFSResourceStoreTest.java   | 117 +++++++++++
 .../org/apache/kylin/storage/hdfs/HDFSLock.java |  41 ++++
 .../kylin/storage/hdfs/HDFSLockManager.java     |  45 +++++
 .../kylin/storage/hdfs/HDFSResourceStore.java   | 198 +++++++++++++++++++
 7 files changed, 409 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 05df177..44d636d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -201,6 +201,11 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.metadata.url");
     }
 
+    //for hdfs resource store
+    public String getHDFSMetadataUrl() {
+        return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
+    }
+
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 4c31a15..ddaf481 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -110,9 +110,10 @@ public class ResourceStoreTest {
         }
 
         // list
-        NavigableSet<String> list;
+        NavigableSet<String> list = null;
 
         list = store.listResources(dir1);
+        System.out.println(list);
         assertTrue(list.contains(path1));
         assertTrue(list.contains(path2) == false);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 6c512dc..b01c377 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -41,6 +41,7 @@ kylin.source.hive.client=cli
 # The metadata store in hbase
 kylin.metadata.url=kylin_default_instance@hbase
 
+
 # The storage for final cube file in hbase
 kylin.storage.url=hbase
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
new file mode 100644
index 0000000..ef04957
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
+
+    KylinConfig kylinConfig;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Ignore
+    @Test
+    public void testHDFSUrl() throws Exception {
+        assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
+        System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
+    }
+
+
+    @Ignore
+    @Test
+    public void testMultiThreadWriteHDFS() throws Exception{
+        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
+        final Path testDir = new Path("hdfs:///test123");
+        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
+        final String fileName = "test.json";
+        int threadNum = 3;
+        ExecutorService service = Executors.newFixedThreadPool(threadNum);
+        final CountDownLatch latch = new CountDownLatch(threadNum);
+        Path p = new Path(testDir,fileName);
+        fs.deleteOnExit(p);
+        fs.createNewFile(p);
+        for(int i=0;i<threadNum;i++) {
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        long id = Thread.currentThread().getId();
+                        Path p = new Path(testDir, fileName);
+                        /*while(fs.exists(p)){
+                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+                            Thread.currentThread().sleep(1000);
+                        }*/
+                        while(!fs.createNewFile(p)){
+                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+                            Thread.currentThread().sleep(1000);
+                        }
+                        System.out.println("Thread id : " + id + " get lock, sleep a while");
+                        Thread.currentThread().sleep(1000);
+                        fs.delete(p,true);
+                        System.out.println("Thread id : " + id + " release lock");
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        Thread.currentThread().sleep(1000);
+        fs.delete(p,true);
+        System.out.println("main thread release lock.Waiting threads down");
+        System.out.println("file still exist : " + fs.exists(p));
+        latch.await();
+    }
+
+    @Test
+    public void testHDFSStore() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        ResourceStore store = new HDFSResourceStore(config);
+        ResourceStoreTest.testAStore(store);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
new file mode 100644
index 0000000..8710edf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLock {
+
+    private Path rawLock;
+
+    private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
+
+    protected HDFSLock(String resourceFullPath) {
+        this.rawLock = new Path(resourceFullPath);
+    }
+
+    public boolean init(FileSystem fs) throws IOException, InterruptedException {
+        if (!fs.isFile(rawLock)) {
+            logger.info("Not support directory lock yet");
+            return false;
+        }
+        while (!fs.createNewFile(rawLock)) {
+            Thread.currentThread().sleep(1000);
+        }
+        return true;
+    }
+
+    public boolean release(FileSystem fs) throws IOException, InterruptedException {
+        while (!fs.delete(rawLock, false)) {
+            Thread.currentThread().sleep(1000);
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
new file mode 100644
index 0000000..1cd0800
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLockManager {
+
+    private static final String LOCK_HOME = "LOCK_HOME";
+
+    private Path lockPath;
+
+    private FileSystem fs;
+
+    public HDFSLockManager(String hdfsWorkingDir) throws IOException{
+        this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
+        this.fs = HadoopUtil.getFileSystem(lockPath);
+        if(!fs.exists(lockPath)){
+            fs.create(lockPath);
+        }
+    }
+
+    public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
+        HDFSLock lock = new HDFSLock(resourceFullPath);
+        boolean success = lock.init(fs);
+        if(success){
+            return lock;
+        }else{
+            throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
+        }
+    }
+
+    public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
+        boolean success = lock.release(fs);
+        if(!success)
+            throw new IllegalStateException("Release lock fail");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
new file mode 100644
index 0000000..c7f0f25
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class HDFSResourceStore extends ResourceStore {
+
+    private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+
+    private Path hdfsMetaPath;
+
+    private FileSystem fs;
+
+    private HDFSLockManager lockManager;
+
+    private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+
+    //public for test. Normal should be protected
+    public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+        super(kylinConfig);
+        String metadataUrl = kylinConfig.getHDFSMetadataUrl();
+        // split TABLE@HBASE_URL
+        int cut = metadataUrl.indexOf('@');
+        String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+        createMetaFolder(metaDirName, kylinConfig);
+    }
+
+    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+        String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
+        fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+        Path hdfsWorkingPath = new Path(hdfsWorkingDir);
+        if (!fs.exists(hdfsWorkingPath)) {
+            throw new IOException("HDFS working dir not exist");
+        }
+        hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
+        if (!fs.exists(hdfsMetaPath)) {
+            fs.create(hdfsMetaPath, true);
+        }
+        lockManager = new HDFSLockManager(hdfsWorkingDir);
+    }
+
+    @Override
+    protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
+        Path p = getRealHDFSPath(folderPath);
+        if (!fs.exists(p) || !fs.isDirectory(p)) {
+            return null;
+        }
+        TreeSet<String> r = new TreeSet<>();
+        FileStatus[] statuses = fs.listStatus(p);
+        String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+        for (FileStatus status : statuses) {
+            r.add(prefix + status.getPath().getName());
+        }
+        return r;
+    }
+
+    @Override
+    protected boolean existsImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        return fs.exists(p) && fs.isFile(p);
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
+        NavigableSet<String> resources = listResources(folderPath);
+        if (resources == null)
+            return Collections.emptyList();
+        List<RawResource> result = Lists.newArrayListWithCapacity(resources.size());
+        try {
+            for (String res : resources) {
+                long ts = getResourceTimestampImpl(res);
+                if (timeStart <= ts && ts < timeEndExclusive) {
+                    RawResource resource = getResourceImpl(res);
+                    if (resource != null) // can be null if is a sub-folder
+                        result.add(resource);
+                }
+            }
+        } catch (IOException ex) {
+            for (RawResource rawResource : result) {
+                IOUtils.closeQuietly(rawResource.inputStream);
+            }
+            throw ex;
+        }
+        return result;
+    }
+
+    @Override
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        if (fs.exists(p) && fs.isFile(p)) {
+            if (fs.getFileStatus(p).getLen() == 0) {
+                logger.warn("Zero length file: " + p.toString());
+            }
+            FSDataInputStream in = fs.open(p);
+            return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    protected long getResourceTimestampImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        if (!fs.exists(p) || !fs.isFile(p)) {
+            return 0;
+        }
+        FileStatus status = fs.getFileStatus(p);
+        return status.getModificationTime();
+    }
+
+    @Override
+    protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        FSDataOutputStream out = null;
+        try {
+            out = fs.create(p, true);
+            IOUtils.copy(content, out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    @Override
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+        Path p = getRealHDFSPath(resPath);
+        if (!fs.exists(p)) {
+            if (oldTS != 0) {
+                throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+            }
+
+        } else {
+            long realLastModify = getResourceTimestamp(resPath);
+            if (realLastModify != oldTS) {
+                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
+            }
+        }
+        putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
+        return newTS;
+    }
+
+    @Override
+    protected void deleteResourceImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        if (fs.exists(p)) {
+            fs.delete(p, true);
+        }
+    }
+
+    @Override
+    protected String getReadableResourcePathImpl(String resPath) {
+        return getRealHDFSPath(resPath).toString();
+    }
+
+    private Path getRealHDFSPath(String resourcePath) {
+        return new Path(this.hdfsMetaPath, resourcePath);
+    }
+
+}