You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/08 08:54:26 UTC

kylin git commit: KYLIN-2477 Make UT and IT compatible with any kinds of metadata resource store

Repository: kylin
Updated Branches:
  refs/heads/master 4f7885c13 -> 01abb00e4


KYLIN-2477 Make UT and IT compatible with any kinds of metadata resource store

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/01abb00e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/01abb00e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/01abb00e

Branch: refs/heads/master
Commit: 01abb00e45047c06d72d47d1eead136d2fa87d3d
Parents: 4f7885c
Author: xiefan46 <95...@qq.com>
Authored: Thu Mar 2 17:49:45 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 8 16:53:09 2017 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  1 +
 .../apache/kylin/common/KylinConfigBase.java    | 19 ++++++++++-
 .../persistence/LocalFileResourceStoreTest.java |  3 +-
 .../common/persistence/ResourceStoreTest.java   | 36 ++++++++++++++++++--
 .../storage/hbase/ITHBaseResourceStoreTest.java | 12 ++++---
 .../storage/hdfs/ITHDFSResourceStoreTest.java   | 33 +++++++-----------
 .../kylin/storage/hdfs/ITLockManagerTest.java   |  4 +--
 .../kylin/storage/hdfs/HDFSResourceStore.java   | 34 ++----------------
 .../apache/kylin/storage/hdfs/LockManager.java  |  7 ++--
 .../apache/kylin/storage/hdfs/ResourceLock.java |  7 ++--
 10 files changed, 87 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index cfceef4..1a55c94 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -235,3 +235,4 @@ kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-histo
 #kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
 #kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5d665df..917879d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -954,8 +954,25 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     //ResourceStore Impl
-    public String getResourceStoreImpl() {
+    /*public String getResourceStoreImpl() {
         return getOptional("kylin.metadata.default-resource-store-impl", "org.apache.kylin.storage.hbase.HBaseResourceStore");
+    }*/
+
+    public Map<String, String> getResourceStoreImpls() {
+        Map<String, String> r = Maps.newLinkedHashMap();
+        // ref constants in ISourceAware
+        r.put("", "org.apache.kylin.common.persistence.FileResourceStore");
+        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
+        r.put("hdfs", "org.apache.kylin.storage.hdfs.HDFSResourceStore");
+        r.putAll(getPropertiesByPrefix("kylin.resource.store.provider."));
+        return r;
+    }
+
+    public String getResourceStoreImpl() {
+        String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl();
+        int cut = metadataUrl.indexOf('@');
+        String key = cut < 0 ? "" : metadataUrl.substring(0, cut);
+        return getResourceStoreImpls().get(key);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
index cc6143d..db22d96 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java
@@ -38,8 +38,7 @@ public class LocalFileResourceStoreTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testFileStore() throws Exception {
-        ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
-        ResourceStoreTest.testAStore(store);
+        ResourceStoreTest.testAStore("org.apache.kylin.common.persistence.FileResourceStore", "", KylinConfig.getInstanceFromEnv());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 79a0c30..10714a5 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,14 +44,30 @@ public class ResourceStoreTest {
 
     private static final String PERFORMANCE_TEST_ROOT_PATH = "/performance";
 
-    private static final int TEST_RESOURCE_COUNT = 1000;
+    private static final int TEST_RESOURCE_COUNT = 100;
 
-    public static void testAStore(ResourceStore store) throws IOException {
+    public static void testAStore(String storeName, String url, KylinConfig kylinConfig) throws Exception {
+        String oldUrl = replaceMetadataUrl(kylinConfig, url);
+        testAStore(getStoreByName(storeName, kylinConfig));
+        replaceMetadataUrl(kylinConfig, oldUrl);
+    }
+
+    public static void testPerformance(String storeName, String url, KylinConfig kylinConfig) throws Exception {
+        String oldUrl = replaceMetadataUrl(kylinConfig, url);
+        testPerformance(getStoreByName(storeName, kylinConfig));
+        replaceMetadataUrl(kylinConfig, oldUrl);
+    }
+
+    public static String mockUrl(String tag, KylinConfig kylinConfig) {
+        return kylinConfig.getMetadataUrlPrefix() + "@" + tag;
+    }
+
+    private static void testAStore(ResourceStore store) throws IOException {
         testBasics(store);
         testGetAllResources(store);
     }
 
-    public static void testPerformance(ResourceStore store) throws IOException {
+    private static void testPerformance(ResourceStore store) throws IOException {
         logger.info("Test basic functions");
         testAStore(store);
         logger.info("Basic function ok. Start to test performance for class : " + store.getClass());
@@ -229,4 +247,16 @@ public class ResourceStoreTest {
         }
     }
 
+    public static String replaceMetadataUrl(KylinConfig kylinConfig, String newUrl) {
+        String oldUrl = kylinConfig.getMetadataUrl();
+        kylinConfig.setProperty("kylin.metadata.url", newUrl);
+        return oldUrl;
+    }
+
+    private static ResourceStore getStoreByName(String storeName, KylinConfig kylinConfig) throws Exception {
+        Class<? extends ResourceStore> cls = ClassUtil.forName(storeName, ResourceStore.class);
+        ResourceStore store = cls.getConstructor(KylinConfig.class).newInstance(kylinConfig);
+        return store;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
index 618488f..4e35118 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceStoreTest;
 import org.apache.kylin.common.persistence.ResourceStoreTest.StringEntity;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -36,9 +35,12 @@ import org.junit.Test;
 
 public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
 
+    private KylinConfig kylinConfig;
+
     @Before
     public void setup() throws Exception {
         this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
     }
 
     @After
@@ -48,8 +50,8 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
 
     @Test
     public void testHBaseStore() throws Exception {
-        ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
-        ResourceStoreTest.testAStore(store);
+        String storeName = "org.apache.kylin.storage.hbase.HBaseResourceStore";
+        ResourceStoreTest.testAStore(storeName, ResourceStoreTest.mockUrl("hbase", kylinConfig), kylinConfig);
     }
 
     @Test
@@ -57,7 +59,8 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
         String path = "/cube/_test_large_cell.json";
         String largeContent = "THIS_IS_A_LARGE_CELL";
         StringEntity content = new StringEntity(largeContent);
-        HBaseResourceStore store = (HBaseResourceStore) ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+        String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig, ResourceStoreTest.mockUrl("hbase", kylinConfig));
+        HBaseResourceStore store = new HBaseResourceStore(KylinConfig.getInstanceFromEnv());
         Configuration hconf = store.getConnection().getConfiguration();
         int origSize = Integer.parseInt(hconf.get("hbase.client.keyvalue.maxsize", "10485760"));
 
@@ -81,6 +84,7 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
             in.close();
 
             store.deleteResource(path);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
         } finally {
             hconf.set("hbase.client.keyvalue.maxsize", "" + origSize);
             store.deleteResource(path);

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index 534839f..0a95f1c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStoreTest;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.storage.hbase.HBaseResourceStore;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -35,9 +34,11 @@ import org.junit.Test;
 
 public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
 
-    KylinConfig kylinConfig;
-    FileSystem fs;
-    String workingDir;
+    private KylinConfig kylinConfig;
+
+    private FileSystem fs;
+
+    private String workingDir;
 
     @Before
     public void setup() throws Exception {
@@ -78,29 +79,21 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
     }
 
     private void doTestWithPath(String path) throws Exception {
-        String oldUrl = kylinConfig.getMetadataUrl();
-        kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
-        HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
-        ResourceStoreTest.testAStore(store);
-        kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+        String storeName = "org.apache.kylin.storage.hdfs.HDFSResourceStore";
+        ResourceStoreTest.testAStore(storeName, ResourceStoreTest.mockUrl("hdfs", kylinConfig), kylinConfig);
         assertTrue(fs.exists(new Path(path)));
     }
 
     @Ignore
     @Test
-    public void performanceTest() throws Exception{
+    public void performanceTest() throws Exception {
+
         //test hdfs performance
-        String oldUrl = kylinConfig.getMetadataUrl();
-        kylinConfig.setProperty("kylin.metadata.url", "kylin_metadata@hdfs");
-        HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
-        ResourceStoreTest.testPerformance(store);
-        kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+        String hdfsStoreName = "org.apache.kylin.storage.hdfs.HDFSResourceStore";
+        ResourceStoreTest.testPerformance(hdfsStoreName, ResourceStoreTest.mockUrl("hdfs", kylinConfig), kylinConfig);
 
         //test hbase
-        oldUrl = kylinConfig.getMetadataUrl();
-        kylinConfig.setProperty("kylin.metadata.url", "kylin_metadata@hbase");
-        HBaseResourceStore store2 = new HBaseResourceStore(kylinConfig);
-        ResourceStoreTest.testPerformance(store2);
-        kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+        String hbaseStoreName = "org.apache.kylin.storage.hbase.HBaseResourceStore.HBaseResourceStore";
+        ResourceStoreTest.testPerformance(hbaseStoreName, ResourceStoreTest.mockUrl("hbase", kylinConfig), kylinConfig);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
index a6cf43b..3449f87 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
@@ -193,9 +193,7 @@ public class ITLockManagerTest extends HBaseMetadataTestCase {
 
         public void doWork(long time, TimeUnit unit) throws Exception {
             ResourceLock lock = lockManager.getLock(lockPath);
-            if (!lock.acquire(time, unit)) {
-                throw new IllegalStateException(clientName + " could not acquire the lock");
-            }
+            lock.acquire(time, unit);
             try {
                 logger.info(clientName + " has the lock");
                 resource.use();

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index d24d3b4..6744805 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -46,14 +45,10 @@ public class HDFSResourceStore extends ResourceStore {
 
     private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
 
-    private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 2;
-
     private Path hdfsMetaPath;
 
     private FileSystem fs;
 
-    private LockManager lockManager;
-
     public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
         super(kylinConfig);
         String metadataUrl = kylinConfig.getMetadataUrl();
@@ -68,8 +63,6 @@ public class HDFSResourceStore extends ResourceStore {
         String path = metadataUrl.substring(0, cut);
         fs = HadoopUtil.getFileSystem(path);
         Path metadataPath = new Path(path);
-        //creat lock manager
-        this.lockManager = new LockManager(kylinConfig, getRelativePath(metadataPath));
         if (fs.exists(metadataPath) == false) {
             logger.warn("Path not exist in HDFS, create it: " + path);
             createMetaFolder(metadataPath, kylinConfig);
@@ -80,20 +73,12 @@ public class HDFSResourceStore extends ResourceStore {
 
     }
 
-
-
     private void createMetaFolder(Path metaDirName, KylinConfig kylinConfig) throws Exception {
         //create hdfs meta path
-        ResourceLock lock = lockManager.getLock(getRelativePath(metaDirName));
-        try {
-            if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS)) {
-                if (!fs.exists(metaDirName)) {
-                    fs.mkdirs(metaDirName);
-                }
-            }
-        } finally {
-            lockManager.releaseLock(lock);
+        if (!fs.exists(metaDirName)) {
+            fs.mkdirs(metaDirName);
         }
+
         logger.info("hdfs meta path created: " + metaDirName.toString());
     }
 
@@ -164,10 +149,7 @@ public class HDFSResourceStore extends ResourceStore {
             return 0;
         }
         FSDataInputStream in = null;
-        ResourceLock lock = null;
         try {
-            lock = lockManager.getLock(resPath);
-            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
             in = fs.open(p);
             long t = in.readLong();
             return t;
@@ -175,7 +157,6 @@ public class HDFSResourceStore extends ResourceStore {
             throw new IOException("Put resource fail", e);
         } finally {
             IOUtils.closeQuietly(in);
-            lockManager.releaseLock(lock);
         }
 
     }
@@ -186,10 +167,7 @@ public class HDFSResourceStore extends ResourceStore {
         Path p = getRealHDFSPath(resPath);
         logger.info("put resource : " + p.toUri());
         FSDataOutputStream out = null;
-        ResourceLock lock = null;
         try {
-            lock = lockManager.getLock(resPath);
-            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
             out = fs.create(p, true);
             out.writeLong(ts);
             IOUtils.copy(content, out);
@@ -198,7 +176,6 @@ public class HDFSResourceStore extends ResourceStore {
             throw new IOException("Put resource fail", e);
         } finally {
             IOUtils.closeQuietly(out);
-            lockManager.releaseLock(lock);
         }
     }
 
@@ -222,18 +199,13 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        ResourceLock lock = null;
         try {
-            lock = lockManager.getLock(resPath);
-            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
             Path p = getRealHDFSPath(resPath);
             if (fs.exists(p)) {
                 fs.delete(p, true);
             }
         } catch (Exception e) {
             throw new IOException("Delete resource fail", e);
-        } finally {
-            lockManager.releaseLock(lock);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
index 4959718..582482f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.Arrays;
 
 public class LockManager {
@@ -75,14 +76,14 @@ public class LockManager {
         return new ResourceLock(lockPath, lock);
     }
 
-    public void releaseLock(ResourceLock lock) {
+    public void releaseLock(ResourceLock lock) throws IOException {
         try {
             if (lock != null)
                 lock.release();
         } catch (Exception e) {
-            logger.error("Fail to release lock");
-            e.printStackTrace();
+            throw new IOException("Fail to release lock", e);
         }
+
     }
 
     private static String getZKConnectString(KylinConfig kylinConfig) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/01abb00e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
index 9d51871..ee5a415 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -33,8 +33,11 @@ public class ResourceLock {
         this.lock = lock;
     }
 
-    public boolean acquire(long time, TimeUnit unit) throws Exception {
-        return lock.acquire(time, unit);
+    public void acquire(long time, TimeUnit unit) throws Exception {
+        boolean success = lock.acquire(time, unit);
+        if(!success){
+            throw new IllegalStateException("Fail to get Zookeeper lock");
+        }
     }
 
     public void acquire() throws Exception{