You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/04/14 06:13:04 UTC

[1/2] kylin git commit: KYLIN-2506 Refactor ZookeeperDistributedJobLock [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2506 fc01989c2 -> 0018fafc7 (forced update)


KYLIN-2506 Refactor ZookeeperDistributedJobLock


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6416df15
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6416df15
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6416df15

Branch: refs/heads/KYLIN-2506
Commit: 6416df157c0ac2f197f69a613bbc82e8e60fc37f
Parents: ce8b24f
Author: kangkaisen <ka...@163.com>
Authored: Fri Apr 7 15:45:43 2017 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Fri Apr 14 14:02:18 2017 +0800

----------------------------------------------------------------------
 core-common/pom.xml                             |   5 +
 .../kylin/common/lock/DistributedJobLock.java   |  38 +++++
 .../org/apache/kylin/common/lock/JobLock.java   |  26 +++
 .../apache/kylin/common/lock/MockJobLock.java   |  33 ++++
 .../kylin/dict/AppendTrieDictionaryBuilder.java |   1 -
 .../java/org/apache/kylin/job/Scheduler.java    |   2 +-
 .../job/impl/threadpool/DefaultScheduler.java   |   2 +-
 .../impl/threadpool/DistributedScheduler.java   |  43 +++--
 .../kylin/job/lock/DistributedJobLock.java      |  36 ----
 .../java/org/apache/kylin/job/lock/JobLock.java |  27 ---
 .../org/apache/kylin/job/lock/MockJobLock.java  |  33 ----
 .../job/impl/threadpool/BaseSchedulerTest.java  |   2 +-
 .../test_case_data/localmeta/kylin.properties   |   4 +-
 .../test_case_data/sandbox/kylin.properties     |   2 +-
 .../kylin/job/BaseTestDistributedScheduler.java |   4 +-
 .../apache/kylin/rest/service/JobService.java   |   2 +-
 .../hbase/util/ZookeeperDistributedJobLock.java | 164 +++++++++----------
 .../storage/hbase/util/ZookeeperJobLock.java    |   2 +-
 18 files changed, 219 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 95d3c29..5b5f78b 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,6 +69,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
new file mode 100644
index 0000000..00d1ca4
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+
+import java.util.concurrent.Executor;
+
+public interface DistributedJobLock extends JobLock {
+
+    boolean lockWithClient(String lockPath, String lockClient);
+
+    boolean isHasLocked(String lockPath);
+
+    void unlock(String lockPath);
+
+    PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process);
+
+    public interface WatcherProcess {
+        void process(String path, String data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
new file mode 100644
index 0000000..5802d71
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+
+public interface JobLock {
+    boolean lock();
+
+    void unlock();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
new file mode 100644
index 0000000..f8233be
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+    @Override
+    public boolean lock() {
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        return;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
index bfd664f..c35a815 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.dict;
 
-import com.google.common.base.Preconditions;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index 93d2510..e2cfd44 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -21,7 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 403abc4..688708e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 1f2e958..b99da7c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
@@ -33,6 +34,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.kylin.common.KylinConfig;
@@ -48,8 +50,8 @@ import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.common.lock.JobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     private ExecutorService jobPool;
     private DefaultContext context;
     private DistributedJobLock jobLock;
+    private PathChildrenCache lockWatch;
 
     private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
     private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -81,6 +84,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     private JobEngineConfig jobEngineConfig;
 
     private final static String SEGMENT_ID = "segmentId";
+    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
 
     //only for it test
     public static DistributedScheduler getInstance(KylinConfig config) {
@@ -177,7 +181,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) {
                 String segmentId = executable.getParam(SEGMENT_ID);
-                if (jobLock.lockWithName(segmentId, serverName)) {
+                if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) {
                     logger.info(executable.toString() + " scheduled in server: " + serverName);
 
                     context.addRunningJob(executable);
@@ -205,7 +209,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
                 if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
                     if (segmentWithLocks.contains(segmentId)) {
                         logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
-                        jobLock.unlockWithName(segmentId);
+                        jobLock.unlock(getLockPath(segmentId));
                         segmentWithLocks.remove(segmentId);
                     }
                 }
@@ -214,15 +218,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     }
 
     //when the segment lock released but the segment related job still running, resume the job.
-    private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock {
+    private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess {
         private String serverName;
 
-        public DoWatchImpl(String serverName) {
+        public WatcherProcessImpl(String serverName) {
             this.serverName = serverName;
         }
 
         @Override
-        public void doWatch(String path, String nodeData) {
+        public void process(String path, String nodeData) {
             String[] paths = path.split("/");
             String segmentId = paths[paths.length - 1];
 
@@ -233,7 +237,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
                     if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
                         try {
                             logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
-                            if (!jobLock.isHasLocked(segmentId)) {
+                            if (!jobLock.isHasLocked(getLockPath(segmentId))) {
                                 executableManager.resumeRunningJobForce(executable.getId());
                                 fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                                 break;
@@ -283,8 +287,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
 
         //watch the zookeeper node change, so that when one job server is down, other job servers can take over.
         watchPool = Executors.newFixedThreadPool(1);
-        DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName);
-        this.jobLock.watchLock(watchPool, doWatchImpl);
+        WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
+        lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess);
 
         int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
         jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
@@ -314,16 +318,27 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         }
     }
 
+    private String getLockPath(String pathName) {
+        return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private String getWatchPath() {
+        return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix();
+    }
+
     @Override
     public void shutdown() throws SchedulerException {
         logger.info("Will shut down Job Engine ....");
 
+        try {
+            lockWatch.close();
+        } catch (IOException e) {
+            throw new SchedulerException(e);
+        }
+
         releaseAllLocks();
         logger.info("The all locks has released");
 
-        watchPool.shutdown();
-        logger.info("The watchPool has down");
-
         fetcherPool.shutdown();
         logger.info("The fetcherPool has down");
 
@@ -333,7 +348,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
 
     private void releaseAllLocks() {
         for (String segmentId : segmentWithLocks) {
-            jobLock.unlockWithName(segmentId);
+            jobLock.unlock(getLockPath(segmentId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
deleted file mode 100644
index 1c173ec..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-import java.util.concurrent.ExecutorService;
-
-public interface DistributedJobLock extends JobLock {
-    
-    boolean lockWithName(String name, String serverName);
-
-    boolean isHasLocked(String segmentId);
-
-    void unlockWithName(String name);
-
-    void watchLock(ExecutorService pool, DoWatchLock doWatch);
-    
-    public interface DoWatchLock {
-        void doWatch(String path, String data);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
deleted file mode 100644
index bbfb801..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-/**
- */
-public interface JobLock {
-    boolean lock();
-
-    void unlock();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
deleted file mode 100644
index cac17b9..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
-    @Override
-    public boolean lock() {
-        return true;
-    }
-
-    @Override
-    public void unlock() {
-        return;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1ada9a1..1bafa34 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.common.lock.MockJobLock;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 9f7b24c..3866575 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin
 kylin.test.bcc.new.key=some-value
 kylin.engine.mr.config-override.test1=test1
 kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
 
 kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
-kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 684b4dd..c0a4968 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF
 kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
 
 # for test
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
 kylin.engine.mr.uhc-reducer-count=3
 
 ### CUBE ###

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 2d79970..4877ca1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
     }
 
     boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
-        return jobLock.lockWithName(cubeName, serverName);
+        return jobLock.lockWithClient(getLockPath(cubeName), serverName);
     }
 
     private static void initZk() {
@@ -197,6 +197,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
     }
 
     private String getLockPath(String pathName) {
-        return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
+        return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4ba426e..31d1ded 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.constant.Constant;

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 983bfd9..5f5a721 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -19,35 +19,30 @@
 package org.apache.kylin.storage.hbase.util;
 
 import java.nio.charset.Charset;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.common.lock.DistributedJobLock;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * the jobLock is specially used to support distributed scheduler.
- */
-
 public class ZookeeperDistributedJobLock implements DistributedJobLock {
     private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
 
-    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
-    final private KylinConfig config;
-    final CuratorFramework zkClient;
-    final PathChildrenCache childrenCache;
+    private final KylinConfig config;
+    private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
+    private final CuratorFramework zkClient;
 
     public ZookeeperDistributedJobLock() {
         this(KylinConfig.getInstanceFromEnv());
@@ -57,16 +52,12 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         this.config = config;
 
         String zkConnectString = ZookeeperUtil.getZKConnectString();
-        logger.info("zk connection string:" + zkConnectString);
         if (StringUtils.isEmpty(zkConnectString)) {
             throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
         }
 
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
-        zkClient.start();
+        zkClient = getZKClient(config, zkConnectString);
 
-        childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true);
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
@@ -75,97 +66,104 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         }));
     }
 
+    //make the zkClient to be singleton
+    private static CuratorFramework getZKClient(KylinConfig config, String zkConnectString) {
+        CuratorFramework zkClient = CACHE.get(config);
+        if (zkClient == null) {
+            synchronized (ZookeeperDistributedJobLock.class) {
+                zkClient = CACHE.get(config);
+                if (zkClient == null) {
+                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+                    zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 120000, 15000, retryPolicy);
+                    zkClient.start();
+                    CACHE.put(config, zkClient);
+                    if (CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return zkClient;
+    }
+
     /**
-     * Lock the segment with the segmentId and serverName.
-     *
-     * <p> if the segment related job want to be scheduled,
-     * it must acquire the segment lock. segmentId is used to get the lock path,
-     * serverName marked which job server keep the segment lock.
+     * Try locking the path with the lockPath and lockClient, if lock successfully,
+     * the lockClient will write into the data of lockPath.
      *
-     * @param segmentId the id of segment need to lock
+     * @param lockPath the path will create in zookeeper
      *
-     * @param serverName the hostname of job server
+     * @param lockClient the mark of client
      *
-     * @return <tt>true</tt> if the segment locked successfully
+     * @return <tt>true</tt> if lock successfully or the lockClient has kept the lock
      *
      * @since 2.0
      */
 
     @Override
-    public boolean lockWithName(String segmentId, String serverName) {
-        String lockPath = getLockPath(segmentId);
-        logger.info(serverName + " start lock the segment: " + segmentId);
+    public boolean lockWithClient(String lockPath, String lockClient) {
+        logger.info(lockClient + " start lock the path: " + lockPath);
 
         boolean hasLock = false;
         try {
-            if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) {
-                logger.error("zookeeper have not start");
-                return false;
-            }
             if (zkClient.checkExists().forPath(lockPath) != null) {
-                if (isKeepLock(serverName, lockPath)) {
+                if (isKeepLock(lockClient, lockPath)) {
                     hasLock = true;
-                    logger.info(serverName + " has kept the lock for segment: " + segmentId);
+                    logger.info(lockClient + " has kept the lock for the path: " + lockPath);
                 }
             } else {
-                zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
-                if (isKeepLock(serverName, lockPath)) {
+                zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, lockClient.getBytes(Charset.forName("UTF-8")));
+                if (isKeepLock(lockClient, lockPath)) {
                     hasLock = true;
-                    logger.info(serverName + " lock the segment: " + segmentId + " successfully");
+                    logger.info(lockClient + " lock the path: " + lockPath + " successfully");
                 }
             }
         } catch (Exception e) {
-            logger.error(serverName + " error acquire lock for the segment: " + segmentId, e);
-        }
-        if (!hasLock) {
-            logger.info(serverName + " fail to acquire lock for the segment: " + segmentId);
-            return false;
+            logger.error(lockClient + " error acquire lock for the path: " + lockPath, e);
         }
-        return true;
+        return hasLock;
     }
 
     /**
      *
-     * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath
+     * Returns <tt>true</tt> if, the lockClient is keeping the lock for the lockPath
      *
-     * @param serverName the hostname of job server
+     * @param lockClient the mark of client
      *
-     * @param lockPath the zookeeper node path of segment
+     * @param lockPath the zookeeper node path for the lock
      *
-     * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise
+     * @return <tt>true</tt> if the lockClient is keeping the lock for the lockPath, otherwise
      * <tt>false</tt>
      *
      * @since 2.0
      */
 
-    private boolean isKeepLock(String serverName, String lockPath) {
+    private boolean isKeepLock(String lockClient, String lockPath) {
         try {
             if (zkClient.checkExists().forPath(lockPath) != null) {
                 byte[] data = zkClient.getData().forPath(lockPath);
                 String lockServerName = new String(data, Charset.forName("UTF-8"));
-                return lockServerName.equalsIgnoreCase(serverName);
+                return lockServerName.equalsIgnoreCase(lockClient);
             }
         } catch (Exception e) {
-            logger.error("fail to get the serverName for the path: " + lockPath, e);
+            logger.error("fail to get the lockClient for the path: " + lockPath, e);
         }
         return false;
     }
 
     /**
      *
-     * Returns <tt>true</tt> if, and only if, the segment has been locked.
+     * Returns <tt>true</tt> if, and only if, the path has been locked.
      *
-     * @param segmentId the id of segment need to release the lock.
+     * @param lockPath the zookeeper node path for the lock
      *
-     * @return <tt>true</tt> if the segment has been locked, otherwise
+     * @return <tt>true</tt> if the path has been locked, otherwise
      * <tt>false</tt>
      *
      * @since 2.0
      */
 
     @Override
-    public boolean isHasLocked(String segmentId) {
-        String lockPath = getLockPath(segmentId);
+    public boolean isHasLocked(String lockPath) {
         try {
             return zkClient.checkExists().forPath(lockPath) != null;
         } catch (Exception e) {
@@ -175,71 +173,66 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     }
 
     /**
-     * release the segment lock with the segmentId.
+     * release the lock with the specific path.
      *
-     * <p> the segment related zookeeper node will be deleted.
+     * <p> the path related zookeeper node will be deleted.
      *
-     * @param segmentId the id of segment need to release the lock
+     * @param lockPath the zookeeper node path for the lock.
      *
      * @since 2.0
      */
 
     @Override
-    public void unlockWithName(String segmentId) {
-        String lockPath = getLockPath(segmentId);
+    public void unlock(String lockPath) {
         try {
-            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
-                if (zkClient.checkExists().forPath(lockPath) != null) {
-                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
-                    logger.info("the lock for " + segmentId + " release successfully");
-                } else {
-                    logger.info("the lock for " + segmentId + " has released");
-                }
+            if (zkClient.checkExists().forPath(lockPath) != null) {
+                zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
+                logger.info("the lock for " + lockPath + " release successfully");
+            } else {
+                logger.info("the lock for " + lockPath + " has released");
             }
         } catch (Exception e) {
-            logger.error("error release lock :" + segmentId);
+            logger.error("error release lock :" + lockPath);
             throw new RuntimeException(e);
         }
     }
 
     /**
-     * watching all the locked segments related zookeeper nodes change,
-     * in order to when one job server is down, other job server can take over the running jobs.
+     * watch the path so that when zookeeper node change, the client could receive the notification.
+     * Note: the client should close the PathChildrenCache in time.
+     *
+     * @param watchPath the path need to watch
      *
-     * @param pool the threadPool watching the zookeeper node change
+     * @param watchExecutor the executor watching the zookeeper node change
      *
-     * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+     * @param watcherProcess do the concrete action with the node path and node data when zookeeper node changed
+     *
+     * @return PathChildrenCache  the client should close the PathChildrenCache in time
      *
      * @since 2.0
      */
 
     @Override
-    public void watchLock(ExecutorService pool, final DoWatchLock doWatch) {
+    public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) {
+        PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true);
         try {
-            childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
-            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
+            cache.start();
+            cache.getListenable().addListener(new PathChildrenCacheListener() {
                 @Override
                 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                     switch (event.getType()) {
                     case CHILD_REMOVED:
-                        doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
+                        watcherProcess.process(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
                         break;
                     default:
                         break;
                     }
                 }
-            }, pool);
+            }, watchExecutor);
         } catch (Exception e) {
             logger.warn("watch the zookeeper node fail: " + e);
         }
-    }
-
-    private String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
-    }
-
-    private String getWatchPath() {
-        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
+        return cache;
     }
 
     @Override
@@ -253,7 +246,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
 
     public void close() {
         try {
-            childrenCache.close();
             zkClient.close();
         } catch (Exception e) {
             logger.error("error occurred to close PathChildrenCache", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/6416df15/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7bf7498..7315d1d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;


[2/2] kylin git commit: KYLIN-2506 Add distributed lock for GlobalDictionaryBuilder

Posted by ka...@apache.org.
KYLIN-2506 Add distributed lock for GlobalDictionaryBuilder


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0018fafc
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0018fafc
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0018fafc

Branch: refs/heads/KYLIN-2506
Commit: 0018fafc7005f9b72dfff9ba4ae53054e3ff63ad
Parents: 6416df1
Author: kangkaisen <ka...@163.com>
Authored: Mon Apr 10 16:47:46 2017 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Fri Apr 14 14:02:29 2017 +0800

----------------------------------------------------------------------
 .../kylin/dict/AppendTrieDictionaryBuilder.java |  12 +-
 .../apache/kylin/dict/GlobalDictHDFSStore.java  |  18 +--
 .../org/apache/kylin/dict/GlobalDictStore.java  |   6 +-
 .../kylin/dict/GlobalDictionaryBuilder.java     | 103 ++++++++++++++-
 .../kylin/dict/AppendTrieDictionaryTest.java    |  76 +-----------
 .../dict/ITGlobalDictionaryBuilderTest.java     | 124 +++++++++++++++++++
 6 files changed, 235 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
index c35a815..efa681b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -20,8 +20,6 @@ package org.apache.kylin.dict;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
@@ -30,7 +28,6 @@ import java.util.TreeMap;
 import static com.google.common.base.Preconditions.checkState;
 
 public class AppendTrieDictionaryBuilder {
-    private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionaryBuilder.class);
 
     private final String baseDir;
     private final String workingDir;
@@ -42,7 +39,6 @@ public class AppendTrieDictionaryBuilder {
     private int nValues;
     private BytesConverter bytesConverter;
     private TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name
-    private int counter;
 
     private DictSliceKey curKey;
     private DictNode curNode;
@@ -77,11 +73,7 @@ public class AppendTrieDictionaryBuilder {
     }
 
     @SuppressWarnings("unchecked")
-    public void addValue(String value) {
-        if (counter++ > 0 && counter % 1_000_000 == 0) {
-            logger.info("processed {} values", counter);
-        }
-
+    public void addValue(String value) throws IOException {
         byte[] valueBytes = bytesConverter.convertToBytes(value);
 
         if (sliceFileMap.isEmpty()) {
@@ -134,7 +126,7 @@ public class AppendTrieDictionaryBuilder {
         return dict;
     }
 
-    private void flushCurrentNode() {
+    private void flushCurrentNode() throws IOException {
         String newSliceFile = store.writeSlice(workingDir, curKey, curNode);
         String oldSliceFile = sliceFileMap.put(curKey, newSliceFile);
         if (oldSliceFile != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
index d9030d3..7cf5591 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
@@ -175,18 +175,16 @@ public class GlobalDictHDFSStore extends GlobalDictStore {
     }
 
     @Override
-    DictSlice readSlice(String directory, String sliceFileName) {
+    DictSlice readSlice(String directory, String sliceFileName) throws IOException {
         Path path = new Path(directory, sliceFileName);
         logger.info("read slice from {}", path);
         try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) {
             return DictSlice.deserializeFrom(input);
-        } catch (IOException e) {
-            throw new RuntimeException(String.format("read slice %s failed", path), e);
         }
     }
 
     @Override
-    String writeSlice(String workingDir, DictSliceKey key, DictNode slice) {
+    String writeSlice(String workingDir, DictSliceKey key, DictNode slice) throws IOException {
         //write new slice
         String sliceFile = IndexFormatV2.sliceFileName(key);
         Path path = new Path(workingDir, sliceFile);
@@ -195,22 +193,16 @@ public class GlobalDictHDFSStore extends GlobalDictStore {
         try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) {
             byte[] bytes = slice.buildTrieBytes();
             out.write(bytes);
-        } catch (IOException e) {
-            throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e);
         }
         return sliceFile;
     }
 
     @Override
-    void deleteSlice(String workingDir, String sliceFileName) {
+    void deleteSlice(String workingDir, String sliceFileName) throws IOException {
         Path path = new Path(workingDir, sliceFileName);
         logger.info("delete slice at {}", path);
-        try {
-            if (fileSystem.exists(path)) {
-                fileSystem.delete(path, false);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(String.format("delete slice at %s failed", path), e);
+        if (fileSystem.exists(path)) {
+            fileSystem.delete(path, false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
index 5817868..6a7a20c 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
@@ -63,7 +63,7 @@ public abstract class GlobalDictStore {
      * @return a <i>DictSlice</i>
      * @throws IOException on I/O error
      */
-    abstract DictSlice readSlice(String workingDir, String sliceFileName);
+    abstract DictSlice readSlice(String workingDir, String sliceFileName) throws IOException;
 
     /**
      * Write a slice with the given key to the specified directory.
@@ -73,7 +73,7 @@ public abstract class GlobalDictStore {
      * @return file name of the new written slice
      * @throws IOException on I/O error
      */
-    abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice);
+    abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice) throws IOException;
 
     /**
      * Delete a slice with the specified file name.
@@ -81,7 +81,7 @@ public abstract class GlobalDictStore {
      * @param sliceFileName file name of the slice, should exist
      * @throws IOException on I/O error
      */
-    abstract void deleteSlice(String workingDir, String sliceFileName);
+    abstract void deleteSlice(String workingDir, String sliceFileName) throws IOException;
 
     /**
      * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 7921980..9d66b12 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -19,9 +19,17 @@
 package org.apache.kylin.dict;
 
 import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
@@ -29,8 +37,16 @@ import org.apache.kylin.common.util.Dictionary;
  * Created by sunyerui on 16/5/24.
  */
 public class GlobalDictionaryBuilder implements IDictionaryBuilder {
-    AppendTrieDictionaryBuilder builder;
-    int baseId;
+    private AppendTrieDictionaryBuilder builder;
+    private int baseId;
+
+    private DistributedJobLock lock;
+    private String sourceColumn;
+    //the job thread name is UUID+threadID
+    private final String jobUUID = Thread.currentThread().getName();
+    private int counter;
+
+    private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
 
     @Override
     public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
@@ -38,6 +54,9 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
             throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
         }
 
+        sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn();
+        lock(sourceColumn);
+
         int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
         this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice);
         this.baseId = baseId;
@@ -45,14 +64,88 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
 
     @Override
     public boolean addValue(String value) {
-        if (value == null)
+        if (++counter % 1_000_000 == 0) {
+            if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+                logger.info("processed {} values", counter);
+            } else {
+                throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock");
+            }
+        }
+
+        if (value == null) {
             return false;
-        builder.addValue(value);
+        }
+
+        try {
+            builder.addValue(value);
+        } catch (Throwable e) {
+            checkAndUnlock();
+            throw new RuntimeException(String.format("Failed to create global dictionary on %s ", sourceColumn), e);
+        }
+
         return true;
     }
 
     @Override
     public Dictionary<String> build() throws IOException {
-        return builder.build(baseId);
+        try {
+            if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+                return builder.build(baseId);
+            }
+        } finally {
+            checkAndUnlock();
+        }
+        return new AppendTrieDictionary<>();
+    }
+
+    private void lock(final String sourceColumn) throws IOException {
+        lock = (DistributedJobLock) ClassUtil.newInstance("org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+
+        if (!lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+            logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn);
+
+            final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
+
+            PathChildrenCache cache = lock.watch(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedJobLock.WatcherProcess() {
+                @Override
+                public void process(String path, String data) {
+                    if (!data.equalsIgnoreCase(jobUUID) && lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+                        try {
+                            bq.put("getLock");
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            });
+
+            long start = System.currentTimeMillis();
+
+            try {
+                bq.take();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                cache.close();
+            }
+
+            logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn);
+        }
+    }
+
+    private void checkAndUnlock() {
+        if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+            lock.unlock(getLockPath(sourceColumn));
+        }
+    }
+
+    private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock";
+
+    private String getLockPath(String pathName) {
+        return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + "/lock";
+    }
+
+    private String getWatchPath(String pathName) {
+        return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 9da5071..e863901 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -22,7 +22,6 @@ import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
@@ -44,8 +43,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,18 +55,14 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
-
-    private static final UUID uuid = UUID.randomUUID();
-    private static final String RESOURCE_DIR = "/dict/append_dict_test/" + uuid;
-    private static final String HDFS_DIR = "file:///tmp/kylin_append_dict";
+    private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID();
     private static String BASE_DIR;
-    private static String LOCAL_BASE_DIR =  "/tmp/kylin_append_dict/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
+    private static String LOCAL_BASE_DIR = "/tmp/kylin/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
 
     @Before
     public void beforeTest() {
         staticCreateTestMetadata();
         KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000");
-        KylinConfig.getInstanceFromEnv().setProperty("kylin.env.hdfs-working-dir", HDFS_DIR);
         BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/";
     }
 
@@ -80,7 +73,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
     }
 
     private void cleanup() {
-        Path basePath = new Path(HDFS_DIR);
+        Path basePath = new Path(BASE_DIR);
         try {
             HadoopUtil.getFileSystem(basePath).delete(basePath, true);
         } catch (IOException e) {
@@ -318,69 +311,6 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         dictionary.getMaxId();
     }
 
-    private class SharedBuilderThread extends Thread {
-        CountDownLatch startLatch;
-        CountDownLatch finishLatch;
-        String prefix;
-        int count;
-
-        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
-            this.startLatch = startLatch;
-            this.finishLatch = finishLatch;
-            this.prefix = prefix;
-            this.count = count;
-        }
-
-        @Override
-        public void run() {
-            try {
-                AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
-                startLatch.countDown();
-                for (int i = 0; i < count; i++) {
-                    builder.addValue(prefix + i);
-                }
-                builder.build(0);
-                finishLatch.countDown();
-            } catch (IOException e) {
-            }
-        }
-    }
-
-    @Ignore
-    @Test
-    public void testSharedBuilder() throws IOException, InterruptedException {
-        final CountDownLatch startLatch = new CountDownLatch(3);
-        final CountDownLatch finishLatch = new CountDownLatch(3);
-
-        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
-        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
-        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
-        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
-        t1.start();
-        t2.start();
-        t3.start();
-        startLatch.await();
-        AppendTrieDictionary dict = builder.build(0);
-        assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
-        assertEquals(110010, dict.getMaxId());
-
-        builder = createBuilder(RESOURCE_DIR);
-        builder.addValue("success");
-        builder.addValue("s");
-        dict = builder.build(0);
-        for (int i = 0; i < 10000; i++) {
-            assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
-        }
-        for (int i = 0; i < 10; i++) {
-            assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
-        }
-        for (int i = 0; i < 100000; i++) {
-            assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
-        }
-        assertEquals(110011, dict.getIdFromValue("success"));
-        assertEquals(110012, dict.getIdFromValue("s"));
-    }
-
     @Test
     public void testSplitContainSuperLongValue() throws IOException {
         String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
new file mode 100644
index 0000000..4afaccd
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Created by kangkaisen on 2017/4/10.
+ */
+public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
+    private DictionaryInfo dictionaryInfo;
+
+    @Before
+    public void beforeTest() throws Exception {
+        staticCreateTestMetadata();
+        dictionaryInfo = new DictionaryInfo("testTable", "testColumn", 0, "String", null);
+    }
+
+    @After
+    public void afterTest() {
+        cleanup();
+        staticCleanupTestMetadata();
+    }
+
+    private void cleanup() {
+        String BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + dictionaryInfo.getResourceDir() + "/";
+        Path basePath = new Path(BASE_DIR);
+        try {
+            HadoopUtil.getFileSystem(basePath).delete(basePath, true);
+        } catch (IOException e) {
+        }
+    }
+
+    @Test
+    public void testGlobalDictLock() throws IOException, InterruptedException {
+        final CountDownLatch startLatch = new CountDownLatch(3);
+        final CountDownLatch finishLatch = new CountDownLatch(3);
+
+        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
+        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
+        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
+        t1.start();
+        t2.start();
+        t3.start();
+        startLatch.await();
+        finishLatch.await();
+
+        GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
+        builder.init(dictionaryInfo, 0);
+        builder.addValue("success");
+        Dictionary<String> dict = builder.build();
+
+        for (int i = 0; i < 10000; i++) {
+            assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
+        }
+        for (int i = 0; i < 10; i++) {
+            assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
+        }
+        for (int i = 0; i < 100000; i++) {
+            assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
+        }
+
+        assertEquals(110011, dict.getIdFromValue("success"));
+    }
+
+    private class SharedBuilderThread extends Thread {
+        CountDownLatch startLatch;
+        CountDownLatch finishLatch;
+        String prefix;
+        int count;
+
+        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
+            this.startLatch = startLatch;
+            this.finishLatch = finishLatch;
+            this.prefix = prefix;
+            this.count = count;
+        }
+
+        @Override
+        public void run() {
+            try {
+                GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
+                startLatch.countDown();
+
+                builder.init(dictionaryInfo, 0);
+                for (int i = 0; i < count; i++) {
+                    builder.addValue(prefix + i);
+                }
+                builder.build();
+                finishLatch.countDown();
+            } catch (IOException e) {
+            }
+        }
+    }
+}