You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:44:02 UTC

[33/47] kylin git commit: Add Zookeeper Lock

Add Zookeeper Lock

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3276e2e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3276e2e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3276e2e

Branch: refs/heads/KYLIN-2361
Commit: d3276e2e909d3001724ee8fda1304ae8b7f08c63
Parents: d23bf93
Author: xiefan46 <95...@qq.com>
Authored: Fri Jan 20 09:48:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 23 16:23:56 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  22 +-
 .../test_case_data/sandbox/kylin.properties     |   4 +
 .../storage/hdfs/ITHDFSResourceStoreTest.java   |  66 +-----
 .../kylin/storage/hdfs/ITLockManagerTest.java   | 205 +++++++++++++++++++
 .../kylin/storage/hbase/HBaseResourceStore.java |   1 +
 .../org/apache/kylin/storage/hdfs/HDFSLock.java |  41 ----
 .../kylin/storage/hdfs/HDFSLockManager.java     |  45 ----
 .../kylin/storage/hdfs/HDFSResourceStore.java   |  95 +++++++--
 .../apache/kylin/storage/hdfs/LockManager.java  | 116 +++++++++++
 .../apache/kylin/storage/hdfs/ResourceLock.java |  51 +++++
 10 files changed, 471 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 44d636d..75b38ff 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,6 +193,14 @@ abstract public class KylinConfigBase implements Serializable {
         return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
     }
 
+    public String getRawHdfsWorkingDirectory() {
+        String root = getRequired("kylin.env.hdfs-working-dir");
+        if (!root.endsWith("/")) {
+            root += "/";
+        }
+        return root;
+    }
+
     // ============================================================================
     // METADATA
     // ============================================================================
@@ -201,11 +209,6 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.metadata.url");
     }
 
-    //for hdfs resource store
-    public String getHDFSMetadataUrl() {
-        return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
-    }
-
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);
@@ -925,4 +928,13 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.web.cross-domain-enabled", "true"));
     }
 
+    //zoo keeper
+    public String getZooKeeperHost() {
+        return getOptional("kylin.storage-zookeeper.host", "localhost");
+    }
+
+    public String getZooKeeperPort() {
+        return getOptional("kylin.storage-zookeeper.port", "2181");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index b01c377..2c2da91 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -177,3 +177,7 @@ kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 #kylin.engine.spark-conf.spark.yarn.queue=default
 #kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
 #kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
+
+
+#zoo keeper
+kylin.storage-zookeeper.host=sandbox
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ef04957..27d8a3c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -17,23 +17,15 @@
 */
 
 package org.apache.kylin.storage.hdfs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceStoreTest;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
 
 /**
  * Created by xiefan on 17-1-10.
@@ -53,65 +45,13 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
         this.cleanupTestMetadata();
     }
 
-    @Ignore
-    @Test
-    public void testHDFSUrl() throws Exception {
-        assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
-        System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
-    }
-
 
-    @Ignore
     @Test
-    public void testMultiThreadWriteHDFS() throws Exception{
-        //System.out.println(kylinConfig.getHdfsWorkingDirectory());
-        final Path testDir = new Path("hdfs:///test123");
-        final FileSystem fs = HadoopUtil.getFileSystem(testDir);
-        final String fileName = "test.json";
-        int threadNum = 3;
-        ExecutorService service = Executors.newFixedThreadPool(threadNum);
-        final CountDownLatch latch = new CountDownLatch(threadNum);
-        Path p = new Path(testDir,fileName);
-        fs.deleteOnExit(p);
-        fs.createNewFile(p);
-        for(int i=0;i<threadNum;i++) {
-            service.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        long id = Thread.currentThread().getId();
-                        Path p = new Path(testDir, fileName);
-                        /*while(fs.exists(p)){
-                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }*/
-                        while(!fs.createNewFile(p)){
-                            System.out.println("Thread id : " + id + " can not get lock,sleep a while");
-                            Thread.currentThread().sleep(1000);
-                        }
-                        System.out.println("Thread id : " + id + " get lock, sleep a while");
-                        Thread.currentThread().sleep(1000);
-                        fs.delete(p,true);
-                        System.out.println("Thread id : " + id + " release lock");
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-        Thread.currentThread().sleep(1000);
-        fs.delete(p,true);
-        System.out.println("main thread release lock.Waiting threads down");
-        System.out.println("file still exist : " + fs.exists(p));
-        latch.await();
-    }
-
-    @Test
-    public void testHDFSStore() throws Exception {
+    public void testResourceStoreBasic() throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         ResourceStore store = new HDFSResourceStore(config);
         ResourceStoreTest.testAStore(store);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
new file mode 100644
index 0000000..2b58d30
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITLockManagerTest extends HBaseMetadataTestCase {
+
+
+    private String zkConnection = "sandbox:2181";
+
+    private KylinConfig kylinConfig;
+
+    private CuratorFramework zkClient;
+
+    private static final String lockRootPath = "/test_lock";
+
+    private LockManager manager;
+
+    private static final int QTY = 5;
+
+    private static final int REPETITIONS = QTY * 10;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
+        zkClient.start();
+        manager = new LockManager(kylinConfig, lockRootPath);
+        System.out.println("nodes in lock root : " + zkClient.getChildren().forPath(lockRootPath));
+
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        zkClient.delete().deletingChildrenIfNeeded().forPath(lockRootPath);
+        List<String> nodes = zkClient.getChildren().forPath("/");
+        System.out.println("nodes in zk after delete : " + nodes);
+        manager.close();
+    }
+
+    @Test
+    public void testCreateLock() throws Exception {
+
+        ResourceLock lock = manager.getLock("/dictionary/numberdict.json");
+        lock.acquire();
+        manager.releaseLock(lock);
+        System.out.println(zkClient.getChildren().forPath(lockRootPath + "/dictionary"));
+        List<String> nodes = zkClient.getChildren().forPath(lockRootPath + "/dictionary");
+        assertEquals(1, nodes.size());
+        assertEquals("numberdict.json", nodes.get(0));
+    }
+
+    @Test
+    public void testLockSafty() throws Exception {
+        // all of the useful sample code is in ExampleClientThatLocks.java
+
+        // FakeLimitedResource simulates some external resource that can only be access by one process at a time
+        final FakeLimitedResource resource = new FakeLimitedResource();
+        ExecutorService service = Executors.newFixedThreadPool(QTY);
+        final TestingServer server = new TestingServer(zkConnection);
+        final List<FutureTask<Void>> tasks = new ArrayList<>();
+        try {
+            for (int i = 0; i < QTY; ++i) {
+                final int index = i;
+                FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        LockManager threadLocalLockManager = new LockManager(kylinConfig, lockRootPath);
+                        try {
+                            ExampleClientThatLocks example = new ExampleClientThatLocks(threadLocalLockManager, lockRootPath, resource, "Client " + index);
+                            for (int j = 0; j < REPETITIONS; ++j) {
+                                example.doWork(10, TimeUnit.SECONDS);
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            // log or do something
+                        } finally {
+                            threadLocalLockManager.close();
+                        }
+                        return null;
+                    }
+                });
+                tasks.add(task);
+                service.submit(task);
+            }
+            for (FutureTask<Void> task : tasks) {
+                task.get();
+            }
+        } finally {
+            CloseableUtils.closeQuietly(server);
+        }
+    }
+
+    class FakeLimitedResource {
+        private final AtomicBoolean inUse = new AtomicBoolean(false);
+
+        public void use() throws InterruptedException {
+            // in a real application this would be accessing/manipulating a shared resource
+
+            if (!inUse.compareAndSet(false, true)) {
+                throw new IllegalStateException("Needs to be used by one client at a time");
+            }
+
+            try {
+                Thread.sleep((long) (3 * Math.random()));
+            } finally {
+                inUse.set(false);
+            }
+        }
+    }
+
+    class TestingServer implements Closeable {
+
+        private String connectionString;
+
+        public TestingServer(String connectionStr) {
+            this.connectionString = connectionStr;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        public String getConnectString() {
+            return connectionString;
+        }
+    }
+
+    class ExampleClientThatLocks {
+
+        private final FakeLimitedResource resource;
+
+        private final String clientName;
+
+        private LockManager lockManager;
+
+        private String lockPath;
+
+        public ExampleClientThatLocks(LockManager lockManager, String lockPath, FakeLimitedResource resource, String clientName) {
+            this.resource = resource;
+            this.clientName = clientName;
+            this.lockManager = lockManager;
+            this.lockPath = lockPath;
+        }
+
+        public void doWork(long time, TimeUnit unit) throws Exception {
+            ResourceLock lock = lockManager.getLock(lockPath);
+            if (!lock.acquire(time, unit)) {
+                throw new IllegalStateException(clientName + " could not acquire the lock");
+            }
+            try {
+                System.out.println(clientName + " has the lock");
+                resource.use();
+            } finally {
+                System.out.println(clientName + " releasing the lock");
+                lock.release(); // always release the lock in a finally block
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 1c45967..170e351 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -311,6 +311,7 @@ public class HBaseResourceStore extends ResourceStore {
         } finally {
             IOUtils.closeQuietly(table);
         }
+
     }
 
     private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
deleted file mode 100644
index 8710edf..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLock {
-
-    private Path rawLock;
-
-    private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
-
-    protected HDFSLock(String resourceFullPath) {
-        this.rawLock = new Path(resourceFullPath);
-    }
-
-    public boolean init(FileSystem fs) throws IOException, InterruptedException {
-        if (!fs.isFile(rawLock)) {
-            logger.info("Not support directory lock yet");
-            return false;
-        }
-        while (!fs.createNewFile(rawLock)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-
-    public boolean release(FileSystem fs) throws IOException, InterruptedException {
-        while (!fs.delete(rawLock, false)) {
-            Thread.currentThread().sleep(1000);
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
deleted file mode 100644
index 1cd0800..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLockManager {
-
-    private static final String LOCK_HOME = "LOCK_HOME";
-
-    private Path lockPath;
-
-    private FileSystem fs;
-
-    public HDFSLockManager(String hdfsWorkingDir) throws IOException{
-        this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
-        this.fs = HadoopUtil.getFileSystem(lockPath);
-        if(!fs.exists(lockPath)){
-            fs.create(lockPath);
-        }
-    }
-
-    public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
-        HDFSLock lock = new HDFSLock(resourceFullPath);
-        boolean success = lock.init(fs);
-        if(success){
-            return lock;
-        }else{
-            throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
-        }
-    }
-
-    public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
-        boolean success = lock.release(fs);
-        if(!success)
-            throw new IllegalStateException("Release lock fail");
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index c7f0f25..a746a97 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,44 +39,62 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
-/**
- * Created by xiefan on 17-1-10.
- */
 public class HDFSResourceStore extends ResourceStore {
 
-    private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+    private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+
+    private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 10;
+
+    private static final String DEFAULT_FOLDER_NAME = "kylin_default_instance";
+
+    private static final String DEFAULT_METADATA_FOLDER_NAME = "hdfs_metadata";
 
     private Path hdfsMetaPath;
 
     private FileSystem fs;
 
-    private HDFSLockManager lockManager;
-
-    private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+    private LockManager lockManager;
 
     //public for test. Normal should be protected
-    public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+    public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
         super(kylinConfig);
-        String metadataUrl = kylinConfig.getHDFSMetadataUrl();
-        // split TABLE@HBASE_URL
+        String metadataUrl = kylinConfig.getMetadataUrl();
         int cut = metadataUrl.indexOf('@');
-        String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+        String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut);
+        metaDirName += "/" + DEFAULT_METADATA_FOLDER_NAME;
+        logger.info("meta dir name :" + metaDirName);
         createMetaFolder(metaDirName, kylinConfig);
     }
 
-    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception {
         String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
         fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+        logger.info("hdfs working dir : " + hdfsWorkingDir);
         Path hdfsWorkingPath = new Path(hdfsWorkingDir);
         if (!fs.exists(hdfsWorkingPath)) {
             throw new IOException("HDFS working dir not exist");
         }
+        //creat lock manager
+        this.lockManager = new LockManager(kylinConfig, kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+        //create hdfs meta path
         hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
         if (!fs.exists(hdfsMetaPath)) {
-            fs.create(hdfsMetaPath, true);
+            ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/"));
+            try {
+                if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES)) {
+                    logger.info("get root lock successfully");
+                    if (!fs.exists(hdfsMetaPath)) {
+                        fs.mkdirs(hdfsMetaPath);
+                        logger.info("create hdfs meta path");
+                    }
+                }
+            } finally {
+                lockManager.releaseLock(lock);
+            }
         }
-        lockManager = new HDFSLockManager(hdfsWorkingDir);
+        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
     }
 
     @Override
@@ -132,7 +150,8 @@ public class HDFSResourceStore extends ResourceStore {
                 logger.warn("Zero length file: " + p.toString());
             }
             FSDataInputStream in = fs.open(p);
-            return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+            long t = in.readLong();
+            return new RawResource(in, t);
         } else {
             return null;
         }
@@ -144,19 +163,42 @@ public class HDFSResourceStore extends ResourceStore {
         if (!fs.exists(p) || !fs.isFile(p)) {
             return 0;
         }
-        FileStatus status = fs.getFileStatus(p);
-        return status.getModificationTime();
+        FSDataInputStream in = null;
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+            in = fs.open(p);
+            long t = in.readLong();
+            return t;
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
+        } finally {
+            IOUtils.closeQuietly(in);
+            lockManager.releaseLock(lock);
+        }
+
     }
 
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        logger.info("res path : " + resPath);
         Path p = getRealHDFSPath(resPath);
+        logger.info("put resource : " + p.toUri());
         FSDataOutputStream out = null;
+        ResourceLock lock = null;
         try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
             out = fs.create(p, true);
+            out.writeLong(ts);
             IOUtils.copy(content, out);
+
+        } catch (Exception e) {
+            throw new IOException("Put resource fail", e);
         } finally {
             IOUtils.closeQuietly(out);
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -180,9 +222,18 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        Path p = getRealHDFSPath(resPath);
-        if (fs.exists(p)) {
-            fs.delete(p, true);
+        ResourceLock lock = null;
+        try {
+            lock = lockManager.getLock(resPath);
+            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+            Path p = getRealHDFSPath(resPath);
+            if (fs.exists(p)) {
+                fs.delete(p, true);
+            }
+        } catch (Exception e) {
+            throw new IOException("Delete resource fail", e);
+        } finally {
+            lockManager.releaseLock(lock);
         }
     }
 
@@ -192,6 +243,8 @@ public class HDFSResourceStore extends ResourceStore {
     }
 
     private Path getRealHDFSPath(String resourcePath) {
+        if (resourcePath.startsWith("/") && resourcePath.length() > 1)
+            resourcePath = resourcePath.substring(1, resourcePath.length());
         return new Path(this.hdfsMetaPath, resourcePath);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
new file mode 100644
index 0000000..4959718
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class LockManager {
+
+    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+    final private KylinConfig config;
+
+    final CuratorFramework zkClient;
+
+    private String lockRootPath;
+
+    public LockManager(String lockRootPath) throws Exception {
+
+        this(KylinConfig.getInstanceFromEnv(), lockRootPath);
+    }
+
+    public LockManager(KylinConfig config, String lockRootPath) throws Exception {
+        this.config = config;
+        this.lockRootPath = lockRootPath;
+        String zkConnectString = getZKConnectString(config);
+        logger.info("zk connection string:" + zkConnectString);
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+        if (zkClient.checkExists().forPath(lockRootPath) == null)
+            zkClient.create().creatingParentsIfNeeded().forPath(lockRootPath);
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                close();
+            }
+        }));
+    }
+
+    public ResourceLock getLock(String name) throws Exception {
+        String lockPath = getLockPath(name);
+        InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
+        return new ResourceLock(lockPath, lock);
+    }
+
+    public void releaseLock(ResourceLock lock) {
+        try {
+            if (lock != null)
+                lock.release();
+        } catch (Exception e) {
+            logger.error("Fail to release lock");
+            e.printStackTrace();
+        }
+    }
+
+    private static String getZKConnectString(KylinConfig kylinConfig) {
+        final String host = kylinConfig.getZooKeeperHost();
+        final String port = kylinConfig.getZooKeeperPort();
+        return StringUtils.join(Iterables.transform(Arrays.asList(host.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    public String getLockPath(String resourceName) {
+        if (!resourceName.startsWith("/"))
+            resourceName = "/" + resourceName;
+        if (resourceName.endsWith("/"))
+            resourceName = resourceName.substring(0, resourceName.length() - 1);
+        return lockRootPath + resourceName;
+    }
+
+    public void close() {
+        try {
+            zkClient.close();
+        } catch (Exception e) {
+            logger.error("error occurred to close PathChildrenCache", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
new file mode 100644
index 0000000..9d51871
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class ResourceLock {
+
+    private String resourcePath;
+
+    private InterProcessMutex lock;
+
+    protected ResourceLock(String resourcePath, InterProcessMutex lock) {
+        this.resourcePath = resourcePath;
+        this.lock = lock;
+    }
+
+    public boolean acquire(long time, TimeUnit unit) throws Exception {
+        return lock.acquire(time, unit);
+    }
+
+    public void acquire() throws Exception{
+       lock.acquire();
+    }
+
+    protected void release() throws Exception {
+        lock.release();
+    }
+
+    public String getResourcePath() {
+        return resourcePath;
+    }
+}