You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/20 09:44:05 UTC

[1/2] kylin git commit: KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2374 870cbf269 -> 24cae5c73 (forced update)


KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0f88801a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0f88801a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0f88801a

Branch: refs/heads/KYLIN-2374
Commit: 0f88801a1e696eb58c58ec05e57f296a2ca6542e
Parents: 1e4ae54
Author: xiefan46 <95...@qq.com>
Authored: Wed Jan 11 10:00:19 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 20 17:43:38 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   9 +-
 .../common/persistence/ResourceStoreTest.java   |   3 +-
 .../test_case_data/sandbox/kylin.properties     |   1 +
 .../storage/hdfs/ITHDFSResourceStoreTest.java   | 117 +++++++++++
 .../org/apache/kylin/storage/hdfs/HDFSLock.java |  41 ++++
 .../kylin/storage/hdfs/HDFSLockManager.java     |  45 +++++
 .../kylin/storage/hdfs/HDFSResourceStore.java   | 198 +++++++++++++++++++
 7 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 74903d5..29ad5eb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -181,7 +181,7 @@ abstract public class KylinConfigBase implements Serializable {
         if (!root.endsWith("/")) {
             root += "/";
         }
-        
+
         // make sure path qualified
         if (!root.contains("://")) {
             if (!root.startsWith("/"))
@@ -189,7 +189,7 @@ abstract public class KylinConfigBase implements Serializable {
             else
                 root = "hdfs://" + root;
         }
-        
+
         return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
     }
 
@@ -201,6 +201,11 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.metadata.url");
     }
 
+    //for hdfs resource store
+    public String getHDFSMetadataUrl() {
+        return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
+    }
+
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 4c31a15..ddaf481 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -110,9 +110,10 @@ public class ResourceStoreTest {
         }
 
         // list
-        NavigableSet<String> list;
+        NavigableSet<String> list = null;
 
         list = store.listResources(dir1);
+        System.out.println(list);
         assertTrue(list.contains(path1));
         assertTrue(list.contains(path2) == false);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 06f8e4b..2d3cd79 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -41,6 +41,7 @@ kylin.source.hive.client=cli
 # The metadata store in hbase
 kylin.metadata.url=kylin_default_instance@hbase
 
+
 # The storage for final cube file in hbase
 kylin.storage.url=hbase
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
new file mode 100644
index 0000000..ef04957
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
+
+    KylinConfig kylinConfig;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Ignore
+    @Test
+    public void testHDFSUrl() throws Exception {
+        assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
+        System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
+    }
+
+
+    @Ignore
+    @Test
+    public void testMultiThreadWriteHDFS() throws Exception{
+        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
+        final Path testDir = new Path("hdfs:///test123");
+        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
+        final String fileName = "test.json";
+        int threadNum = 3;
+        ExecutorService service = Executors.newFixedThreadPool(threadNum);
+        final CountDownLatch latch = new CountDownLatch(threadNum);
+        Path p = new Path(testDir,fileName);
+        fs.deleteOnExit(p);
+        fs.createNewFile(p);
+        for(int i=0;i<threadNum;i++) {
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        long id = Thread.currentThread().getId();
+                        Path p = new Path(testDir, fileName);
+                        /*while(fs.exists(p)){
+                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+                            Thread.currentThread().sleep(1000);
+                        }*/
+                        while(!fs.createNewFile(p)){
+                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+                            Thread.currentThread().sleep(1000);
+                        }
+                        System.out.println("Thread id : " + id + " get lock, sleep a while");
+                        Thread.currentThread().sleep(1000);
+                        fs.delete(p,true);
+                        System.out.println("Thread id : " + id + " release lock");
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        Thread.currentThread().sleep(1000);
+        fs.delete(p,true);
+        System.out.println("main thread release lock.Waiting threads down");
+        System.out.println("file still exist : " + fs.exists(p));
+        latch.await();
+    }
+
+    @Test
+    public void testHDFSStore() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        ResourceStore store = new HDFSResourceStore(config);
+        ResourceStoreTest.testAStore(store);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
new file mode 100644
index 0000000..8710edf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLock {
+
+    private Path rawLock;
+
+    private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
+
+    protected HDFSLock(String resourceFullPath) {
+        this.rawLock = new Path(resourceFullPath);
+    }
+
+    public boolean init(FileSystem fs) throws IOException, InterruptedException {
+        if (!fs.isFile(rawLock)) {
+            logger.info("Not support directory lock yet");
+            return false;
+        }
+        while (!fs.createNewFile(rawLock)) {
+            Thread.currentThread().sleep(1000);
+        }
+        return true;
+    }
+
+    public boolean release(FileSystem fs) throws IOException, InterruptedException {
+        while (!fs.delete(rawLock, false)) {
+            Thread.currentThread().sleep(1000);
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
new file mode 100644
index 0000000..1cd0800
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLockManager {
+
+    private static final String LOCK_HOME = "LOCK_HOME";
+
+    private Path lockPath;
+
+    private FileSystem fs;
+
+    public HDFSLockManager(String hdfsWorkingDir) throws IOException{
+        this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
+        this.fs = HadoopUtil.getFileSystem(lockPath);
+        if(!fs.exists(lockPath)){
+            fs.create(lockPath);
+        }
+    }
+
+    public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
+        HDFSLock lock = new HDFSLock(resourceFullPath);
+        boolean success = lock.init(fs);
+        if(success){
+            return lock;
+        }else{
+            throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
+        }
+    }
+
+    public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
+        boolean success = lock.release(fs);
+        if(!success)
+            throw new IllegalStateException("Release lock fail");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
new file mode 100644
index 0000000..c7f0f25
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class HDFSResourceStore extends ResourceStore {
+
+    private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+
+    private Path hdfsMetaPath;
+
+    private FileSystem fs;
+
+    private HDFSLockManager lockManager;
+
+    private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+
+    //public for test. Normal should be protected
+    public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+        super(kylinConfig);
+        String metadataUrl = kylinConfig.getHDFSMetadataUrl();
+        // split TABLE@HBASE_URL
+        int cut = metadataUrl.indexOf('@');
+        String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+        createMetaFolder(metaDirName, kylinConfig);
+    }
+
+    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+        String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
+        fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+        Path hdfsWorkingPath = new Path(hdfsWorkingDir);
+        if (!fs.exists(hdfsWorkingPath)) {
+            throw new IOException("HDFS working dir not exist");
+        }
+        hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
+        if (!fs.exists(hdfsMetaPath)) {
+            fs.create(hdfsMetaPath, true);
+        }
+        lockManager = new HDFSLockManager(hdfsWorkingDir);
+    }
+
+    @Override
+    protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
+        Path p = getRealHDFSPath(folderPath);
+        if (!fs.exists(p) || !fs.isDirectory(p)) {
+            return null;
+        }
+        TreeSet<String> r = new TreeSet<>();
+        FileStatus[] statuses = fs.listStatus(p);
+        String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+        for (FileStatus status : statuses) {
+            r.add(prefix + status.getPath().getName());
+        }
+        return r;
+    }
+
+    @Override
+    protected boolean existsImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        return fs.exists(p) && fs.isFile(p);
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
+        NavigableSet<String> resources = listResources(folderPath);
+        if (resources == null)
+            return Collections.emptyList();
+        List<RawResource> result = Lists.newArrayListWithCapacity(resources.size());
+        try {
+            for (String res : resources) {
+                long ts = getResourceTimestampImpl(res);
+                if (timeStart <= ts && ts < timeEndExclusive) {
+                    RawResource resource = getResourceImpl(res);
+                    if (resource != null) // can be null if is a sub-folder
+                        result.add(resource);
+                }
+            }
+        } catch (IOException ex) {
+            for (RawResource rawResource : result) {
+                IOUtils.closeQuietly(rawResource.inputStream);
+            }
+            throw ex;
+        }
+        return result;
+    }
+
+    @Override
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        if (fs.exists(p) && fs.isFile(p)) {
+            if (fs.getFileStatus(p).getLen() == 0) {
+                logger.warn("Zero length file: " + p.toString());
+            }
+            FSDataInputStream in = fs.open(p);
+            return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    protected long getResourceTimestampImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        if (!fs.exists(p) || !fs.isFile(p)) {
+            return 0;
+        }
+        FileStatus status = fs.getFileStatus(p);
+        return status.getModificationTime();
+    }
+
+    @Override
+    protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        FSDataOutputStream out = null;
+        try {
+            out = fs.create(p, true);
+            IOUtils.copy(content, out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    @Override
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+        Path p = getRealHDFSPath(resPath);
+        if (!fs.exists(p)) {
+            if (oldTS != 0) {
+                throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+            }
+
+        } else {
+            long realLastModify = getResourceTimestamp(resPath);
+            if (realLastModify != oldTS) {
+                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
+            }
+        }
+        putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
+        return newTS;
+    }
+
+    @Override
+    protected void deleteResourceImpl(String resPath) throws IOException {
+        Path p = getRealHDFSPath(resPath);
+        if (fs.exists(p)) {
+            fs.delete(p, true);
+        }
+    }
+
+    @Override
+    protected String getReadableResourcePathImpl(String resPath) {
+        return getRealHDFSPath(resPath).toString();
+    }
+
+    private Path getRealHDFSPath(String resourcePath) {
+        return new Path(this.hdfsMetaPath, resourcePath);
+    }
+
+}


[2/2] kylin git commit: Add Zookeeper Lock

Posted by sh...@apache.org.
Add Zookeeper Lock

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/24cae5c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/24cae5c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/24cae5c7

Branch: refs/heads/KYLIN-2374
Commit: 24cae5c73889dcca4e18b2de4e12b68415ef2d62
Parents: 0f88801
Author: xiefan46 <95...@qq.com>
Authored: Fri Jan 20 09:48:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 20 17:43:47 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  13 +-
 .../storage/hdfs/ITHDFSResourceStoreTest.java   |  65 +-----
 .../kylin/storage/hdfs/ITLockManagerTest.java   | 206 +++++++++++++++++++
 .../kylin/storage/hbase/HBaseResourceStore.java |   1 +
 .../org/apache/kylin/storage/hdfs/HDFSLock.java |  41 ----
 .../kylin/storage/hdfs/HDFSLockManager.java     |  45 ----
 .../kylin/storage/hdfs/HDFSResourceStore.java   |  90 ++++++--
 .../apache/kylin/storage/hdfs/LockManager.java  | 117 +++++++++++
 .../apache/kylin/storage/hdfs/ResourceLock.java |  51 +++++
 .../kylin/storage/hdfs/ZookeeperConfig.java     |  27 +++
 10 files changed, 483 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 29ad5eb..0602e9d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,6 +193,14 @@ abstract public class KylinConfigBase implements Serializable {
         return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
     }
 
+    public String getRawHdfsWorkingDirectory() {
+        String root = getRequired("kylin.env.hdfs-working-dir");
+        if (!root.endsWith("/")) {
+            root += "/";
+        }
+        return root;
+    }
+
     // ============================================================================
     // METADATA
     // ============================================================================
@@ -201,11 +209,6 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.metadata.url");
     }
 
-    //for hdfs resource store
-    public String getHDFSMetadataUrl() {
-        return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
-    }
-
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ef04957..41bbcc0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -17,10 +17,8 @@
 */
 
 package org.apache.kylin.storage.hdfs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceStoreTest;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -29,11 +27,6 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
 
 /**
  * Created by xiefan on 17-1-10.
@@ -53,65 +46,13 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
         this.cleanupTestMetadata();
     }
 
-    @Ignore
-    @Test
-    public void testHDFSUrl() throws Exception {
-        assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
-        System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
-    }
-
 
-    @Ignore
     @Test
-    public void testMultiThreadWriteHDFS() throws Exception{
-        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
-        final Path testDir = new Path("hdfs:///test123");
-        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
-        final String fileName = "test.json";
-        int threadNum = 3;
-        ExecutorService service = Executors.newFixedThreadPool(threadNum);
-        final CountDownLatch latch = new CountDownLatch(threadNum);
-        Path p = new Path(testDir,fileName);
-        fs.deleteOnExit(p);
-        fs.createNewFile(p);
-        for(int i=0;i<threadNum;i++) {
-            service.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        long id = Thread.currentThread().getId();
-                        Path p = new Path(testDir, fileName);
-                        /*while(fs.exists(p)){
-                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }*/
-                        while(!fs.createNewFile(p)){
-                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }
-                        System.out.println("Thread id : " + id + " get lock, sleep a while");
-                        Thread.currentThread().sleep(1000);
-                        fs.delete(p,true);
-                        System.out.println("Thread id : " + id + " release lock");
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-        Thread.currentThread().sleep(1000);
-        fs.delete(p,true);
-        System.out.println("main thread release lock.Waiting threads down");
-        System.out.println("file still exist : " + fs.exists(p));
-        latch.await();
-    }
-
-    @Test
-    public void testHDFSStore() throws Exception {
+    public void testResourceStoreBasic() throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         ResourceStore store = new HDFSResourceStore(config);
         ResourceStoreTest.testAStore(store);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
new file mode 100644
index 0000000..56347e4
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITLockManagerTest extends HBaseMetadataTestCase {
+
+
+    private String zkConnection = "sandbox:2181";
+
+    private KylinConfig kylinConfig;
+
+    private CuratorFramework zkClient;
+
+    private static final String lockRootPath = "/test_lock";
+
+    private LockManager manager;
+
+    private static final int QTY = 5;
+
+    private static final int REPETITIONS = QTY * 10;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
+        zkClient.start();
+        manager = new LockManager(kylinConfig, lockRootPath);
+        System.out.println("nodes in lock root : " + zkClient.getChildren().forPath(lockRootPath));
+
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        zkClient.delete().deletingChildrenIfNeeded().forPath(lockRootPath);
+        List<String> nodes = zkClient.getChildren().forPath("/");
+        System.out.println("nodes in zk after delete : " + nodes);
+        manager.close();
+    }
+
+    @Test
+    public void testCreateLock() throws Exception {
+
+        ResourceLock lock = manager.getLock("/dictionary/numberdict.json");
+        lock.acquire();
+        manager.releaseLock(lock);
+        System.out.println(zkClient.getChildren().forPath(lockRootPath + "/dictionary"));
+        List<String> nodes = zkClient.getChildren().forPath(lockRootPath + "/dictionary");
+        assertEquals(1, nodes.size());
+        assertEquals("numberdict.json", nodes.get(0));
+    }
+
+    @Test
+    public void testLockSafty() throws Exception {
+        // all of the useful sample code is in ExampleClientThatLocks.java
+
+        // FakeLimitedResource simulates some external resource that can only be access by one process at a time
+        final FakeLimitedResource resource = new FakeLimitedResource();
+        ExecutorService service = Executors.newFixedThreadPool(QTY);
+        final TestingServer server = new TestingServer(zkConnection);
+        final List<FutureTask<Void>> tasks = new ArrayList<>();
+        try {
+            for (int i = 0; i < QTY; ++i) {
+                final int index = i;
+                FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        LockManager threadLocalLockManager = new LockManager(kylinConfig, lockRootPath);
+                        try {
+                            ExampleClientThatLocks example = new ExampleClientThatLocks(threadLocalLockManager, lockRootPath, resource, "Client " + index);
+                            for (int j = 0; j < REPETITIONS; ++j) {
+                                example.doWork(10, TimeUnit.SECONDS);
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            // log or do something
+                        } finally {
+                            threadLocalLockManager.close();
+                        }
+                        return null;
+                    }
+                });
+                tasks.add(task);
+                service.submit(task);
+            }
+            for (FutureTask<Void> task : tasks) {
+                task.get();
+            }
+        } finally {
+            CloseableUtils.closeQuietly(server);
+        }
+    }
+
+    class FakeLimitedResource {
+        private final AtomicBoolean inUse = new AtomicBoolean(false);
+
+        public void use() throws InterruptedException {
+            // in a real application this would be accessing/manipulating a shared resource
+
+            if (!inUse.compareAndSet(false, true)) {
+                throw new IllegalStateException("Needs to be used by one client at a time");
+            }
+
+            try {
+                Thread.sleep((long) (3 * Math.random()));
+            } finally {
+                inUse.set(false);
+            }
+        }
+    }
+
+    class TestingServer implements Closeable {
+
+        private String connectionString;
+
+        public TestingServer(String connectionStr) {
+            this.connectionString = connectionStr;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        public String getConnectString() {
+            return connectionString;
+        }
+    }
+
+    class ExampleClientThatLocks {
+
+        private final FakeLimitedResource resource;
+
+        private final String clientName;
+
+        private LockManager lockManager;
+
+        private String lockPath;
+
+        public ExampleClientThatLocks(LockManager lockManager, String lockPath, FakeLimitedResource resource, String clientName) {
+            this.resource = resource;
+            this.clientName = clientName;
+            this.lockManager = lockManager;
+            this.lockPath = lockPath;
+        }
+
+        public void doWork(long time, TimeUnit unit) throws Exception {
+            ResourceLock lock = lockManager.getLock(lockPath);
+            if (!lock.acquire(time, unit)) {
+                throw new IllegalStateException(clientName + " could not acquire the lock");
+            }
+            try {
+                System.out.println(clientName + " has the lock");
+                resource.use();
+            } finally {
+                System.out.println(clientName + " releasing the lock");
+                lock.release(); // always release the lock in a finally block
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 6217350..5980cb5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -314,6 +314,7 @@ public class HBaseResourceStore extends ResourceStore {
         } finally {
             IOUtils.closeQuietly(table);
         }
+
     }
 
     private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
deleted file mode 100644
index 8710edf..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLock {
-
-    private Path rawLock;
-
-    private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
-
-    protected HDFSLock(String resourceFullPath) {
-        this.rawLock = new Path(resourceFullPath);
-    }
-
-    public boolean init(FileSystem fs) throws IOException, InterruptedException {
-        if (!fs.isFile(rawLock)) {
-            logger.info("Not support directory lock yet");
-            return false;
-        }
-        while (!fs.createNewFile(rawLock)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-
-    public boolean release(FileSystem fs) throws IOException, InterruptedException {
-        while (!fs.delete(rawLock, false)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
deleted file mode 100644
index 1cd0800..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLockManager {
-
-    private static final String LOCK_HOME = "LOCK_HOME";
-
-    private Path lockPath;
-
-    private FileSystem fs;
-
-    public HDFSLockManager(String hdfsWorkingDir) throws IOException{
-        this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
-        this.fs = HadoopUtil.getFileSystem(lockPath);
-        if(!fs.exists(lockPath)){
-            fs.create(lockPath);
-        }
-    }
-
-    public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
-        HDFSLock lock = new HDFSLock(resourceFullPath);
-        boolean success = lock.init(fs);
-        if(success){
-            return lock;
-        }else{
-            throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
-        }
-    }
-
-    public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
-        boolean success = lock.release(fs);
-        if(!success)
-            throw new IllegalStateException("Release lock fail");
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index c7f0f25..8cb9f0d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,44 +39,58 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
-/**
- * Created by xiefan on 17-1-10.
- */
 public class HDFSResourceStore extends ResourceStore {
 
-    private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+    private static final String DEFAULT_FOLDER_NAME = "kylin_default_instance";
 
     private Path hdfsMetaPath;
 
     private FileSystem fs;
 
-    private HDFSLockManager lockManager;
-
     private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
 
+    private LockManager lockManager;
+
     //public for test. Normal should be protected
-    public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+    public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
         super(kylinConfig);
-        String metadataUrl = kylinConfig.getHDFSMetadataUrl();
-        // split TABLE@HBASE_URL
+        String metadataUrl = kylinConfig.getMetadataUrl();
         int cut = metadataUrl.indexOf('@');
-        String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+        String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut);
+        metaDirName += "/hdfs_metadata";
+        logger.info("meta dir name :" + metaDirName);
         createMetaFolder(metaDirName, kylinConfig);
     }
 
-    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception {
         String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
         fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+        logger.info("hdfs working dir : " + hdfsWorkingDir);
         Path hdfsWorkingPath = new Path(hdfsWorkingDir);
         if (!fs.exists(hdfsWorkingPath)) {
             throw new IOException("HDFS working dir not exist");
         }
+        //creat lock manager
+        this.lockManager = new LockManager(kylinConfig,kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+        //create hdfs meta path
         hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
         if (!fs.exists(hdfsMetaPath)) {
-            fs.create(hdfsMetaPath, true);
+            ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/"));
+            try {
+                if (lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES)) {
+                    logger.info("get root lock successfully");
+                    if (!fs.exists(hdfsMetaPath)) {
+                        fs.mkdirs(hdfsMetaPath);
+                        logger.info("create hdfs meta path");
+                    }
+                }
+            } finally {
+                lockManager.releaseLock(lock);
+            }
         }
-        lockManager = new HDFSLockManager(hdfsWorkingDir);
+        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
     }
 
     @Override
@@ -132,7 +146,8 @@ public class HDFSResourceStore extends ResourceStore {
                 logger.warn("Zero length file: " + p.toString());
             }
             FSDataInputStream in = fs.open(p);
-            return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+            long t = in.readLong();
+            return new RawResource(in, t);
         } else {
             return null;
         }
@@ -144,19 +159,42 @@ public class HDFSResourceStore extends ResourceStore {
         if (!fs.exists(p) || !fs.isFile(p)) {
             return 0;
         }
-        FileStatus status = fs.getFileStatus(p);
-        return status.getModificationTime();
+        FSDataInputStream in = null;
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
+            in = fs.open(p);
+            long t = in.readLong();
+            return t;
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
+        } finally {
+            IOUtils.closeQuietly(in);
+            lockManager.releaseLock(lock);
+        }
+
     }
 
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        logger.info("res path : " + resPath);
         Path p = getRealHDFSPath(resPath);
+        logger.info("put resource : " + p.toUri());
         FSDataOutputStream out = null;
+        ResourceLock lock = null;
         try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
             out = fs.create(p, true);
+            out.writeLong(ts);
             IOUtils.copy(content, out);
+
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
         } finally {
             IOUtils.closeQuietly(out);
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -180,9 +218,18 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        Path p = getRealHDFSPath(resPath);
-        if (fs.exists(p)) {
-            fs.delete(p, true);
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
+            Path p = getRealHDFSPath(resPath);
+            if (fs.exists(p)) {
+                fs.delete(p, true);
+            }
+        } catch (Exception e) {
+            throw new IOException("Delete resource fail", e);
+        } finally {
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -192,7 +239,10 @@ public class HDFSResourceStore extends ResourceStore {
     }
 
     private Path getRealHDFSPath(String resourcePath) {
+        if (resourcePath.startsWith("/") && resourcePath.length() > 1)
+            resourcePath = resourcePath.substring(1, resourcePath.length());
         return new Path(this.hdfsMetaPath, resourcePath);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
new file mode 100644
index 0000000..a4d0080
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class LockManager {
+
+    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+    final private KylinConfig config;
+
+    final CuratorFramework zkClient;
+
+    private String lockRootPath;
+
+    public LockManager(String lockRootPath) throws Exception{
+
+        this(KylinConfig.getInstanceFromEnv(),lockRootPath);
+    }
+
+    public LockManager(KylinConfig config,String lockRootPath) throws Exception{
+        this.config = config;
+        this.lockRootPath = lockRootPath;
+        String zkConnectString = getZKConnectString();
+        logger.info("zk connection string:" + zkConnectString);
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+        if(zkClient.checkExists().forPath(lockRootPath) == null)
+            zkClient.create().creatingParentsIfNeeded().forPath(lockRootPath);
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                close();
+            }
+        }));
+    }
+
+    public ResourceLock getLock(String name) throws Exception {
+        String lockPath = getLockPath(name);
+        InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
+        return new ResourceLock(lockPath, lock);
+    }
+
+    public void releaseLock(ResourceLock lock) {
+        try {
+            if (lock != null)
+                lock.release();
+        } catch (Exception e) {
+            logger.error("Fail to release lock");
+            e.printStackTrace();
+        }
+    }
+
+    private static String getZKConnectString() {
+        final String serverList = ZookeeperConfig.DEFAULT_ZK_HOST;
+        final String port = ZookeeperConfig.DEFAULT_ZK_PORT;
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    public String getLockPath(String resourceName) {
+        if (!resourceName.startsWith("/"))
+            resourceName = "/" + resourceName;
+        if (resourceName.endsWith("/"))
+            resourceName = resourceName.substring(0, resourceName.length() - 1);
+        return lockRootPath + resourceName;
+    }
+
+    public void close() {
+        try {
+            zkClient.close();
+        } catch (Exception e) {
+            logger.error("error occurred to close PathChildrenCache", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
new file mode 100644
index 0000000..9d51871
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class ResourceLock {
+
+    private String resourcePath;
+
+    private InterProcessMutex lock;
+
+    protected ResourceLock(String resourcePath, InterProcessMutex lock) {
+        this.resourcePath = resourcePath;
+        this.lock = lock;
+    }
+
+    public boolean acquire(long time, TimeUnit unit) throws Exception {
+        return lock.acquire(time, unit);
+    }
+
+    public void acquire() throws Exception{
+       lock.acquire();
+    }
+
+    protected void release() throws Exception {
+        lock.release();
+    }
+
+    public String getResourcePath() {
+        return resourcePath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
new file mode 100644
index 0000000..3f67e8d
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+
+public interface ZookeeperConfig {
+    String DEFAULT_ZK_HOST = "sandbox";
+
+    String DEFAULT_ZK_PORT = "2181";
+
+    long DEFAULT_TIMEOUT = 10;
+}