You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/20 08:29:40 UTC

[2/2] kylin git commit: Add Zookeeper Lock

Add Zookeeper Lock

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/870cbf26
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/870cbf26
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/870cbf26

Branch: refs/heads/KYLIN-2374
Commit: 870cbf2697f91b068b56064c302a67d09e45a12e
Parents: a594da7
Author: xiefan46 <95...@qq.com>
Authored: Fri Jan 20 09:48:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 20 16:13:05 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   2 +-
 .../storage/hdfs/ITHDFSResourceStoreTest.java   | 117 -----------------
 .../kylin/storage/hbase/HBaseResourceStore.java |   1 +
 .../org/apache/kylin/storage/hdfs/Config.java   |  29 +++++
 .../org/apache/kylin/storage/hdfs/HDFSLock.java |  41 ------
 .../kylin/storage/hdfs/HDFSLockManager.java     |  45 -------
 .../kylin/storage/hdfs/HDFSResourceStore.java   |  78 ++++++++---
 .../apache/kylin/storage/hdfs/LockManager.java  | 117 +++++++++++++++++
 .../apache/kylin/storage/hdfs/ResourceLock.java |  53 ++++++++
 .../storage/hdfs/ExampleClientThatLocks.java    |  50 +++++++
 .../kylin/storage/hdfs/FakeLimitedResource.java |  41 ++++++
 .../storage/hdfs/HDFSResourceStoreTest.java     | 118 +++++++++++++++++
 .../kylin/storage/hdfs/LockManagerTest.java     |  69 ++++++++++
 .../kylin/storage/hdfs/TestingServer.java       |  42 ++++++
 .../org/apache/kylin/storage/hdfs/ZkDemo.java   | 129 +++++++++++++++++++
 15 files changed, 713 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 29ad5eb..a4d6ca0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -203,7 +203,7 @@ abstract public class KylinConfigBase implements Serializable {
 
     //for hdfs resource store
     public String getHDFSMetadataUrl() {
-        return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
+        return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs_meta@hdfs");
     }
 
     // for test only

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
deleted file mode 100644
index ef04957..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hdfs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.ResourceStoreTest;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Created by xiefan on 17-1-10.
- */
-public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
-
-    KylinConfig kylinConfig;
-
-    @Before
-    public void setup() throws Exception {
-        this.createTestMetadata();
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Ignore
-    @Test
-    public void testHDFSUrl() throws Exception {
-        assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
-        System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
-    }
-
-
-    @Ignore
-    @Test
-    public void testMultiThreadWriteHDFS() throws Exception{
-        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
-        final Path testDir = new Path("hdfs:///test123");
-        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
-        final String fileName = "test.json";
-        int threadNum = 3;
-        ExecutorService service = Executors.newFixedThreadPool(threadNum);
-        final CountDownLatch latch = new CountDownLatch(threadNum);
-        Path p = new Path(testDir,fileName);
-        fs.deleteOnExit(p);
-        fs.createNewFile(p);
-        for(int i=0;i<threadNum;i++) {
-            service.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        long id = Thread.currentThread().getId();
-                        Path p = new Path(testDir, fileName);
-                        /*while(fs.exists(p)){
-                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }*/
-                        while(!fs.createNewFile(p)){
-                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }
-                        System.out.println("Thread id : " + id + " get lock, sleep a while");
-                        Thread.currentThread().sleep(1000);
-                        fs.delete(p,true);
-                        System.out.println("Thread id : " + id + " release lock");
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-        Thread.currentThread().sleep(1000);
-        fs.delete(p,true);
-        System.out.println("main thread release lock.Waiting threads down");
-        System.out.println("file still exist : " + fs.exists(p));
-        latch.await();
-    }
-
-    @Test
-    public void testHDFSStore() throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        ResourceStore store = new HDFSResourceStore(config);
-        ResourceStoreTest.testAStore(store);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 6217350..5980cb5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -314,6 +314,7 @@ public class HBaseResourceStore extends ResourceStore {
         } finally {
             IOUtils.closeQuietly(table);
         }
+
     }
 
     private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java
new file mode 100644
index 0000000..c9b50ae
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/Config.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+/**
+ * Created by xiefan on 17-1-18.
+ */
+public interface Config {
+    String ZK_HOST = "sandbox";
+
+    String ZK_PORT = "2181";
+
+    long TIME = 10;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
deleted file mode 100644
index 8710edf..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLock {
-
-    private Path rawLock;
-
-    private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
-
-    protected HDFSLock(String resourceFullPath) {
-        this.rawLock = new Path(resourceFullPath);
-    }
-
-    public boolean init(FileSystem fs) throws IOException, InterruptedException {
-        if (!fs.isFile(rawLock)) {
-            logger.info("Not support directory lock yet");
-            return false;
-        }
-        while (!fs.createNewFile(rawLock)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-
-    public boolean release(FileSystem fs) throws IOException, InterruptedException {
-        while (!fs.delete(rawLock, false)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
deleted file mode 100644
index 1cd0800..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLockManager {
-
-    private static final String LOCK_HOME = "LOCK_HOME";
-
-    private Path lockPath;
-
-    private FileSystem fs;
-
-    public HDFSLockManager(String hdfsWorkingDir) throws IOException{
-        this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
-        this.fs = HadoopUtil.getFileSystem(lockPath);
-        if(!fs.exists(lockPath)){
-            fs.create(lockPath);
-        }
-    }
-
-    public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
-        HDFSLock lock = new HDFSLock(resourceFullPath);
-        boolean success = lock.init(fs);
-        if(success){
-            return lock;
-        }else{
-            throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
-        }
-    }
-
-    public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
-        boolean success = lock.release(fs);
-        if(!success)
-            throw new IllegalStateException("Release lock fail");
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index c7f0f25..717c27f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,44 +39,57 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by xiefan on 17-1-10.
  */
 public class HDFSResourceStore extends ResourceStore {
 
-    private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+    private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs_meta";
 
     private Path hdfsMetaPath;
 
     private FileSystem fs;
 
-    private HDFSLockManager lockManager;
-
     private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
 
+    private LockManager lockManager;
+
     //public for test. Normal should be protected
-    public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+    public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
         super(kylinConfig);
         String metadataUrl = kylinConfig.getHDFSMetadataUrl();
-        // split TABLE@HBASE_URL
         int cut = metadataUrl.indexOf('@');
         String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+        this.lockManager = new LockManager();
         createMetaFolder(metaDirName, kylinConfig);
     }
 
-    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception {
         String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
         fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+        logger.info("hdfs working dir : " + hdfsWorkingDir);
         Path hdfsWorkingPath = new Path(hdfsWorkingDir);
         if (!fs.exists(hdfsWorkingPath)) {
             throw new IOException("HDFS working dir not exist");
         }
         hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
         if (!fs.exists(hdfsMetaPath)) {
-            fs.create(hdfsMetaPath, true);
+            ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/"));
+            try {
+                if (lock.acquire(Config.TIME, TimeUnit.MINUTES)) {
+                    logger.info("get root lock successfully");
+                    if (!fs.exists(hdfsMetaPath)) {
+                        fs.mkdirs(hdfsMetaPath);
+                        logger.info("create hdfs meta path");
+                    }
+                }
+            } finally {
+                lockManager.releaseLock(lock);
+            }
         }
-        lockManager = new HDFSLockManager(hdfsWorkingDir);
+        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
     }
 
     @Override
@@ -132,7 +145,8 @@ public class HDFSResourceStore extends ResourceStore {
                 logger.warn("Zero length file: " + p.toString());
             }
             FSDataInputStream in = fs.open(p);
-            return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+            long t = in.readLong();
+            return new RawResource(in, t);
         } else {
             return null;
         }
@@ -144,19 +158,42 @@ public class HDFSResourceStore extends ResourceStore {
         if (!fs.exists(p) || !fs.isFile(p)) {
             return 0;
         }
-        FileStatus status = fs.getFileStatus(p);
-        return status.getModificationTime();
+        FSDataInputStream in = null;
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(Config.TIME, TimeUnit.MINUTES);
+            in = fs.open(p);
+            long t = in.readLong();
+            return t;
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
+        } finally {
+            IOUtils.closeQuietly(in);
+            lockManager.releaseLock(lock);
+        }
+
     }
 
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        logger.info("res path : " + resPath);
         Path p = getRealHDFSPath(resPath);
+        logger.info("put resource : " + p.toUri());
         FSDataOutputStream out = null;
+        ResourceLock lock = null;
         try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(Config.TIME, TimeUnit.MINUTES);
             out = fs.create(p, true);
+            out.writeLong(ts);
             IOUtils.copy(content, out);
+
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
         } finally {
             IOUtils.closeQuietly(out);
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -180,9 +217,18 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        Path p = getRealHDFSPath(resPath);
-        if (fs.exists(p)) {
-            fs.delete(p, true);
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(Config.TIME, TimeUnit.MINUTES);
+            Path p = getRealHDFSPath(resPath);
+            if (fs.exists(p)) {
+                fs.delete(p, true);
+            }
+        } catch (Exception e) {
+            throw new IOException("Delete resource fail", e);
+        } finally {
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -192,6 +238,8 @@ public class HDFSResourceStore extends ResourceStore {
     }
 
     private Path getRealHDFSPath(String resourcePath) {
+        if (resourcePath.startsWith("/") && resourcePath.length() > 1)
+            resourcePath = resourcePath.substring(1, resourcePath.length());
         return new Path(this.hdfsMetaPath, resourcePath);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
new file mode 100644
index 0000000..9b18749
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+/**
+ * Created by xiefan on 17-1-18.
+ */
+public class LockManager {
+
+    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/hdfs_meta/lock";
+
+    final private KylinConfig config;
+
+    final CuratorFramework zkClient;
+
+    public LockManager() {
+        this(KylinConfig.getInstanceFromEnv());
+    }
+
+    public LockManager(KylinConfig config) {
+        this.config = config;
+
+        String zkConnectString = getZKConnectString();
+        logger.info("zk connection string:" + zkConnectString);
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                close();
+            }
+        }));
+    }
+
+    public ResourceLock getLock(String name) throws Exception {
+        String lockPath = getLockPath(name);
+        InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
+        return new ResourceLock(lockPath, lock);
+    }
+
+    public void releaseLock(ResourceLock lock) {
+        try {
+            if (lock != null)
+                lock.release();
+        } catch (Exception e) {
+            logger.error("Fail to release lock");
+            e.printStackTrace();
+        }
+    }
+
+    private static String getZKConnectString() {
+        final String serverList = Config.ZK_HOST;
+        final String port = Config.ZK_PORT;
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    public String getLockPath(String resourceName) {
+        if (!resourceName.startsWith("/"))
+            resourceName = "/" + resourceName;
+        if (resourceName.endsWith("/"))
+            resourceName = resourceName.substring(0, resourceName.length() - 1);
+        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + resourceName;
+    }
+
+    public void close() {
+        try {
+            zkClient.close();
+        } catch (Exception e) {
+            logger.error("error occurred to close PathChildrenCache", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
new file mode 100644
index 0000000..dc99a50
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by xiefan on 17-1-18.
+ */
+public class ResourceLock {
+
+    private String resourcePath;
+
+    private InterProcessMutex lock;
+
+    protected ResourceLock(String resourcePath, InterProcessMutex lock) {
+        this.resourcePath = resourcePath;
+        this.lock = lock;
+    }
+
+    public boolean acquire(long time, TimeUnit unit) throws Exception {
+        return lock.acquire(time, unit);
+    }
+
+    public void acquire() throws Exception{
+       lock.acquire();
+    }
+
+    protected void release() throws Exception {
+        lock.release();
+    }
+
+    public String getResourcePath() {
+        return resourcePath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java
new file mode 100644
index 0000000..fbb2c85
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ExampleClientThatLocks.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+/**
+ * Created by xiefan on 17-1-18.
+ */
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import java.util.concurrent.TimeUnit;
+
+public class ExampleClientThatLocks {
+    private final InterProcessMutex lock;
+    private final FakeLimitedResource resource;
+    private final String clientName;
+
+    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
+        this.resource = resource;
+        this.clientName = clientName;
+        lock = new InterProcessMutex(client, lockPath);
+    }
+
+    public void doWork(long time, TimeUnit unit) throws Exception {
+        if (!lock.acquire(time, unit)) {
+            throw new IllegalStateException(clientName + " could not acquire the lock");
+        }
+        try {
+            System.out.println(clientName + " has the lock");
+            resource.use();
+        } finally {
+            System.out.println(clientName + " releasing the lock");
+            lock.release(); // always release the lock in a finally block
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java
new file mode 100644
index 0000000..28c69ad
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/FakeLimitedResource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Simulates some external resource that can only be access by one process at a time
+ */
+public class FakeLimitedResource {
+    private final AtomicBoolean inUse = new AtomicBoolean(false);
+
+    public void use() throws InterruptedException {
+        // in a real application this would be accessing/manipulating a shared resource
+
+        if (!inUse.compareAndSet(false, true)) {
+            throw new IllegalStateException("Needs to be used by one client at a time");
+        }
+
+        try {
+            Thread.sleep((long) (3 * Math.random()));
+        } finally {
+            inUse.set(false);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java
new file mode 100644
index 0000000..b844a60
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/HDFSResourceStoreTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+@Ignore
+public class HDFSResourceStoreTest extends HBaseMetadataTestCase {
+
+    KylinConfig kylinConfig;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Ignore
+    @Test
+    public void testHDFSUrl() throws Exception {
+        assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
+        System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
+    }
+
+    @Ignore
+    @Test
+    public void testMultiThreadWriteHDFS() throws Exception {
+        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
+        final Path testDir = new Path("hdfs:///test123");
+        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
+        final String fileName = "test.json";
+        int threadNum = 3;
+        ExecutorService service = Executors.newFixedThreadPool(threadNum);
+        final CountDownLatch latch = new CountDownLatch(threadNum);
+        Path p = new Path(testDir, fileName);
+        fs.deleteOnExit(p);
+        fs.createNewFile(p);
+        for (int i = 0; i < threadNum; i++) {
+            service.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        long id = Thread.currentThread().getId();
+                        Path p = new Path(testDir, fileName);
+                        /*while(fs.exists(p)){
+                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+                            Thread.currentThread().sleep(1000);
+                        }*/
+                        while (!fs.createNewFile(p)) {
+                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+                            Thread.currentThread().sleep(1000);
+                        }
+                        System.out.println("Thread id : " + id + " get lock, sleep a while");
+                        Thread.currentThread().sleep(1000);
+                        fs.delete(p, true);
+                        System.out.println("Thread id : " + id + " release lock");
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        Thread.currentThread().sleep(1000);
+        fs.delete(p, true);
+        System.out.println("main thread release lock.Waiting threads down");
+        System.out.println("file still exist : " + fs.exists(p));
+        latch.await();
+    }
+
+    @Test
+    public void testHDFSStore() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        ResourceStore store = new HDFSResourceStore(config);
+        ResourceStoreTest.testAStore(store);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java
new file mode 100644
index 0000000..1f239d9
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/LockManagerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created by xiefan on 17-1-20.
+ */
+@Ignore
+public class LockManagerTest extends HBaseMetadataTestCase{
+
+    public static String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+
+    private String zkConnection = "sandbox:2181";
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testCreateLock() throws Exception{
+        LockManager manager = new LockManager();
+        ResourceLock lock = manager.getLock("/dictionary/numberdict.json");
+        lock.acquire();
+        manager.releaseLock(lock);
+    }
+
+    @Test
+    public void testConnect() throws Exception {
+        ZooKeeper zk = new ZooKeeper(zkConnection, 60000, new Watcher() {
+            @Override
+            public void process(WatchedEvent watchedEvent) {
+                System.out.println("EVENT:" + watchedEvent.getType());
+            }
+        });
+        System.out.println("ls / => " + zk.getChildren("/kylin/hdfs_meta/lock/kylin_default_instance/dictionary", true));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java
new file mode 100644
index 0000000..f462905
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/TestingServer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-18.
+ */
+public class TestingServer implements Closeable {
+
+    private String connectionString;
+
+    public TestingServer(String connectionStr) {
+        this.connectionString = connectionStr;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    public String getConnectString() {
+        return connectionString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/870cbf26/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java
new file mode 100644
index 0000000..785ac09
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hdfs/ZkDemo.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by xiefan on 17-1-18.
+ */
+@Ignore
+public class ZkDemo {
+
+    private String zkConnection = "sandbox:2181";
+
+    private static final int QTY = 5;
+
+    private static final int REPETITIONS = QTY * 10;
+
+    private static final String PATH = "/examples/lock";
+
+    @Test
+    public void testConnect() throws Exception {
+        ZooKeeper zk = new ZooKeeper(zkConnection, 60000, new Watcher() {
+            @Override
+            public void process(WatchedEvent watchedEvent) {
+                System.out.println("EVENT:" + watchedEvent.getType());
+            }
+        });
+        System.out.println("ls / => " + zk.getChildren("/kylin", true));
+    }
+
+    @Test
+    public void testCreateNode() throws Exception {
+        ZooKeeper zk = new ZooKeeper(zkConnection, 60000, new Watcher() {
+            @Override
+            public void process(WatchedEvent watchedEvent) {
+                System.out.println("EVENT:" + watchedEvent.getType());
+            }
+        });
+        zk.create(PATH, PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        System.out.println("ls / => " + zk.getChildren("/examples", true));
+    }
+
+    @Test
+    public void testStartCurator() throws Exception {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        CuratorFramework client = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
+        client.start();
+    }
+
+    @Test
+    public void testCuratorLock() throws Exception {
+        // all of the useful sample code is in ExampleClientThatLocks.java
+
+        // FakeLimitedResource simulates some external resource that can only be access by one process at a time
+        final FakeLimitedResource resource = new FakeLimitedResource();
+        ExecutorService service = Executors.newFixedThreadPool(QTY);
+        final TestingServer server = new TestingServer(zkConnection);
+        final List<FutureTask<Void>> tasks = new ArrayList<>();
+        try {
+            for (int i = 0; i < QTY; ++i) {
+                final int index = i;
+                FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+                        try {
+                            client.start();
+
+                            ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
+                            for (int j = 0; j < REPETITIONS; ++j) {
+                                example.doWork(10, TimeUnit.SECONDS);
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            // log or do something
+                        } finally {
+                            CloseableUtils.closeQuietly(client);
+                        }
+                        return null;
+                    }
+                });
+                tasks.add(task);
+                service.submit(task);
+            }
+            for (FutureTask<Void> task : tasks) {
+                task.get();
+            }
+        } finally {
+            CloseableUtils.closeQuietly(server);
+        }
+    }
+}