You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/04/13 11:22:15 UTC
[49/50] [abbrv] kylin git commit: KYLIN-2506 Refactor
ZookeeperDistributedJobLock
KYLIN-2506 Refactor ZookeeperDistributedJobLock
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd3b7655
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd3b7655
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd3b7655
Branch: refs/heads/KYLIN-2506
Commit: fd3b7655839af8dcd2370570ed9e426db410eb12
Parents: ce8b24f
Author: kangkaisen <ka...@163.com>
Authored: Fri Apr 7 15:45:43 2017 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Thu Apr 13 16:50:49 2017 +0800
----------------------------------------------------------------------
core-common/pom.xml | 5 +
.../kylin/common/lock/DistributedJobLock.java | 38 +++++
.../org/apache/kylin/common/lock/JobLock.java | 26 +++
.../apache/kylin/common/lock/MockJobLock.java | 33 ++++
.../model/validation/rule/DictionaryRule.java | 4 +-
.../validation/rule/DictionaryRuleTest.java | 12 +-
.../kylin/dict/AppendTrieDictionaryBuilder.java | 1 -
.../java/org/apache/kylin/job/Scheduler.java | 2 +-
.../job/impl/threadpool/DefaultScheduler.java | 2 +-
.../impl/threadpool/DistributedScheduler.java | 43 +++--
.../kylin/job/lock/DistributedJobLock.java | 36 ----
.../java/org/apache/kylin/job/lock/JobLock.java | 27 ---
.../org/apache/kylin/job/lock/MockJobLock.java | 33 ----
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
.../test_case_data/localmeta/kylin.properties | 4 +-
.../test_case_data/sandbox/kylin.properties | 2 +-
.../kylin/job/BaseTestDistributedScheduler.java | 4 +-
.../apache/kylin/rest/service/JobService.java | 2 +-
.../hbase/util/ZookeeperDistributedJobLock.java | 164 +++++++++----------
.../storage/hbase/util/ZookeeperJobLock.java | 2 +-
20 files changed, 225 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 95d3c29..5b5f78b 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,6 +69,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
new file mode 100644
index 0000000..00d1ca4
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+
+import java.util.concurrent.Executor;
+
+public interface DistributedJobLock extends JobLock {
+
+ boolean lockWithClient(String lockPath, String lockClient);
+
+ boolean isHasLocked(String lockPath);
+
+ void unlock(String lockPath);
+
+ PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process);
+
+ public interface WatcherProcess {
+ void process(String path, String data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
new file mode 100644
index 0000000..5802d71
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+
+public interface JobLock {
+ boolean lock();
+
+ void unlock();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
new file mode 100644
index 0000000..f8233be
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index 8da3ca0..4392e5a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -29,7 +29,6 @@ import org.apache.kylin.cube.model.DictionaryDesc;
import org.apache.kylin.cube.model.validation.IValidatorRule;
import org.apache.kylin.cube.model.validation.ResultLevel;
import org.apache.kylin.cube.model.validation.ValidateContext;
-import org.apache.kylin.dict.GlobalDictionaryBuilder;
import org.apache.kylin.metadata.model.TblColRef;
/**
@@ -47,6 +46,7 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: ";
static final String ERROR_TRANSITIVE_REUSE = "Transitive REUSE is not allowed for dictionary: ";
static final String ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE = "Global dictionary couldn't be used for dimension column: ";
+ static final String GLOBAL_DICT_BUILDER_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
@Override
public void validate(CubeDesc cubeDesc, ValidateContext context) {
@@ -82,7 +82,7 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
return;
}
- if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol)) {
+ if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GLOBAL_DICT_BUILDER_CLASS) && dimensionColumns.contains(dictCol)) {
context.addResult(ResultLevel.ERROR, ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE + dictCol);
return;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index fcb723e..dc90a69 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -35,7 +35,6 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DictionaryDesc;
import org.apache.kylin.cube.model.validation.ValidateContext;
-import org.apache.kylin.dict.GlobalDictionaryBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -73,7 +72,7 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
@Test
public void testBadDesc() throws IOException {
testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, "FakeBuilderClass"));
- testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, GlobalDictionaryBuilder.class.getName()));
+ testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, DictionaryRule.GLOBAL_DICT_BUILDER_CLASS));
}
@Test
@@ -88,20 +87,17 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
@Test
public void testBadDesc4() throws IOException {
- testDictionaryDesc(ERROR_TRANSITIVE_REUSE,
- DictionaryDesc.create("lstg_site_id", "SELLER_ID", null),
- DictionaryDesc.create("price", "lstg_site_id", null));
+ testDictionaryDesc(ERROR_TRANSITIVE_REUSE, DictionaryDesc.create("lstg_site_id", "SELLER_ID", null), DictionaryDesc.create("price", "lstg_site_id", null));
}
@Test
public void testBadDesc5() throws IOException {
- testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE,
- DictionaryDesc.create("CATEG_LVL2_NAME", null, GlobalDictionaryBuilder.class.getName()));
+ testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE, DictionaryDesc.create("CATEG_LVL2_NAME", null, DictionaryRule.GLOBAL_DICT_BUILDER_CLASS));
}
@Test
public void testGoodDesc2() throws IOException {
- testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, GlobalDictionaryBuilder.class.getName()));
+ testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, DictionaryRule.GLOBAL_DICT_BUILDER_CLASS));
}
private void testDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
index bfd664f..c35a815 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -18,7 +18,6 @@
package org.apache.kylin.dict;
-import com.google.common.base.Preconditions;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index 93d2510..e2cfd44 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -21,7 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
/**
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 403abc4..688708e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 1f2e958..b99da7c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job.impl.threadpool;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
@@ -33,6 +34,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.common.KylinConfig;
@@ -48,8 +50,8 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private ExecutorService jobPool;
private DefaultContext context;
private DistributedJobLock jobLock;
+ private PathChildrenCache lockWatch;
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -81,6 +84,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private JobEngineConfig jobEngineConfig;
private final static String SEGMENT_ID = "segmentId";
+ public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
//only for it test
public static DistributedScheduler getInstance(KylinConfig config) {
@@ -177,7 +181,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
public void run() {
try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) {
String segmentId = executable.getParam(SEGMENT_ID);
- if (jobLock.lockWithName(segmentId, serverName)) {
+ if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
@@ -205,7 +209,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
if (segmentWithLocks.contains(segmentId)) {
logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
- jobLock.unlockWithName(segmentId);
+ jobLock.unlock(getLockPath(segmentId));
segmentWithLocks.remove(segmentId);
}
}
@@ -214,15 +218,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
//when the segment lock released but the segment related job still running, resume the job.
- private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock {
+ private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess {
private String serverName;
- public DoWatchImpl(String serverName) {
+ public WatcherProcessImpl(String serverName) {
this.serverName = serverName;
}
@Override
- public void doWatch(String path, String nodeData) {
+ public void process(String path, String nodeData) {
String[] paths = path.split("/");
String segmentId = paths[paths.length - 1];
@@ -233,7 +237,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
try {
logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
- if (!jobLock.isHasLocked(segmentId)) {
+ if (!jobLock.isHasLocked(getLockPath(segmentId))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
break;
@@ -283,8 +287,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
//watch the zookeeper node change, so that when one job server is down, other job servers can take over.
watchPool = Executors.newFixedThreadPool(1);
- DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName);
- this.jobLock.watchLock(watchPool, doWatchImpl);
+ WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
+ lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess);
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
@@ -314,16 +318,27 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
}
+ private String getLockPath(String pathName) {
+ return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix() + "/" + pathName;
+ }
+
+ private String getWatchPath() {
+ return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix();
+ }
+
@Override
public void shutdown() throws SchedulerException {
logger.info("Will shut down Job Engine ....");
+ try {
+ lockWatch.close();
+ } catch (IOException e) {
+ throw new SchedulerException(e);
+ }
+
releaseAllLocks();
logger.info("The all locks has released");
- watchPool.shutdown();
- logger.info("The watchPool has down");
-
fetcherPool.shutdown();
logger.info("The fetcherPool has down");
@@ -333,7 +348,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private void releaseAllLocks() {
for (String segmentId : segmentWithLocks) {
- jobLock.unlockWithName(segmentId);
+ jobLock.unlock(getLockPath(segmentId));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
deleted file mode 100644
index 1c173ec..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-import java.util.concurrent.ExecutorService;
-
-public interface DistributedJobLock extends JobLock {
-
- boolean lockWithName(String name, String serverName);
-
- boolean isHasLocked(String segmentId);
-
- void unlockWithName(String name);
-
- void watchLock(ExecutorService pool, DoWatchLock doWatch);
-
- public interface DoWatchLock {
- void doWatch(String path, String data);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
deleted file mode 100644
index bbfb801..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-/**
- */
-public interface JobLock {
- boolean lock();
-
- void unlock();
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
deleted file mode 100644
index cac17b9..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public void unlock() {
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1ada9a1..1bafa34 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.common.lock.MockJobLock;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 9f7b24c..3866575 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin
kylin.test.bcc.new.key=some-value
kylin.engine.mr.config-override.test1=test1
kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
-kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 684b4dd..c0a4968 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF
kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
# for test
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
kylin.engine.mr.uhc-reducer-count=3
### CUBE ###
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 2d79970..4877ca1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
- return jobLock.lockWithName(cubeName, serverName);
+ return jobLock.lockWithClient(getLockPath(cubeName), serverName);
}
private static void initZk() {
@@ -197,6 +197,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
private String getLockPath(String pathName) {
- return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
+ return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4ba426e..31d1ded 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.constant.Constant;
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 983bfd9..5f5a721 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -19,35 +19,30 @@
package org.apache.kylin.storage.hbase.util;
import java.nio.charset.Charset;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.common.lock.DistributedJobLock;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * the jobLock is specially used to support distributed scheduler.
- */
-
public class ZookeeperDistributedJobLock implements DistributedJobLock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
- public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
- final private KylinConfig config;
- final CuratorFramework zkClient;
- final PathChildrenCache childrenCache;
+ private final KylinConfig config;
+ private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
+ private final CuratorFramework zkClient;
public ZookeeperDistributedJobLock() {
this(KylinConfig.getInstanceFromEnv());
@@ -57,16 +52,12 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
this.config = config;
String zkConnectString = ZookeeperUtil.getZKConnectString();
- logger.info("zk connection string:" + zkConnectString);
if (StringUtils.isEmpty(zkConnectString)) {
throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
}
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
- zkClient.start();
+ zkClient = getZKClient(config, zkConnectString);
- childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
@@ -75,97 +66,104 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}));
}
+ //make the zkClient to be singleton
+ private static CuratorFramework getZKClient(KylinConfig config, String zkConnectString) {
+ CuratorFramework zkClient = CACHE.get(config);
+ if (zkClient == null) {
+ synchronized (ZookeeperDistributedJobLock.class) {
+ zkClient = CACHE.get(config);
+ if (zkClient == null) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 120000, 15000, retryPolicy);
+ zkClient.start();
+ CACHE.put(config, zkClient);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ }
+ }
+ }
+ return zkClient;
+ }
+
/**
- * Lock the segment with the segmentId and serverName.
- *
- * <p> if the segment related job want to be scheduled,
- * it must acquire the segment lock. segmentId is used to get the lock path,
- * serverName marked which job server keep the segment lock.
+ * Try locking the path with the lockPath and lockClient, if lock successfully,
+ * the lockClient will write into the data of lockPath.
*
- * @param segmentId the id of segment need to lock
+ * @param lockPath the path will create in zookeeper
*
- * @param serverName the hostname of job server
+ * @param lockClient the mark of client
*
- * @return <tt>true</tt> if the segment locked successfully
+ * @return <tt>true</tt> if lock successfully or the lockClient has kept the lock
*
* @since 2.0
*/
@Override
- public boolean lockWithName(String segmentId, String serverName) {
- String lockPath = getLockPath(segmentId);
- logger.info(serverName + " start lock the segment: " + segmentId);
+ public boolean lockWithClient(String lockPath, String lockClient) {
+ logger.info(lockClient + " start lock the path: " + lockPath);
boolean hasLock = false;
try {
- if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) {
- logger.error("zookeeper have not start");
- return false;
- }
if (zkClient.checkExists().forPath(lockPath) != null) {
- if (isKeepLock(serverName, lockPath)) {
+ if (isKeepLock(lockClient, lockPath)) {
hasLock = true;
- logger.info(serverName + " has kept the lock for segment: " + segmentId);
+ logger.info(lockClient + " has kept the lock for the path: " + lockPath);
}
} else {
- zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
- if (isKeepLock(serverName, lockPath)) {
+ zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, lockClient.getBytes(Charset.forName("UTF-8")));
+ if (isKeepLock(lockClient, lockPath)) {
hasLock = true;
- logger.info(serverName + " lock the segment: " + segmentId + " successfully");
+ logger.info(lockClient + " lock the path: " + lockPath + " successfully");
}
}
} catch (Exception e) {
- logger.error(serverName + " error acquire lock for the segment: " + segmentId, e);
- }
- if (!hasLock) {
- logger.info(serverName + " fail to acquire lock for the segment: " + segmentId);
- return false;
+ logger.error(lockClient + " error acquire lock for the path: " + lockPath, e);
}
- return true;
+ return hasLock;
}
/**
*
- * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath
+ * Returns <tt>true</tt> if, the lockClient is keeping the lock for the lockPath
*
- * @param serverName the hostname of job server
+ * @param lockClient the mark of client
*
- * @param lockPath the zookeeper node path of segment
+ * @param lockPath the zookeeper node path for the lock
*
- * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise
+ * @return <tt>true</tt> if the lockClient is keeping the lock for the lockPath, otherwise
* <tt>false</tt>
*
* @since 2.0
*/
- private boolean isKeepLock(String serverName, String lockPath) {
+ private boolean isKeepLock(String lockClient, String lockPath) {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
byte[] data = zkClient.getData().forPath(lockPath);
String lockServerName = new String(data, Charset.forName("UTF-8"));
- return lockServerName.equalsIgnoreCase(serverName);
+ return lockServerName.equalsIgnoreCase(lockClient);
}
} catch (Exception e) {
- logger.error("fail to get the serverName for the path: " + lockPath, e);
+ logger.error("fail to get the lockClient for the path: " + lockPath, e);
}
return false;
}
/**
*
- * Returns <tt>true</tt> if, and only if, the segment has been locked.
+ * Returns <tt>true</tt> if, and only if, the path has been locked.
*
- * @param segmentId the id of segment need to release the lock.
+ * @param lockPath the zookeeper node path for the lock
*
- * @return <tt>true</tt> if the segment has been locked, otherwise
+ * @return <tt>true</tt> if the path has been locked, otherwise
* <tt>false</tt>
*
* @since 2.0
*/
@Override
- public boolean isHasLocked(String segmentId) {
- String lockPath = getLockPath(segmentId);
+ public boolean isHasLocked(String lockPath) {
try {
return zkClient.checkExists().forPath(lockPath) != null;
} catch (Exception e) {
@@ -175,71 +173,66 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}
/**
- * release the segment lock with the segmentId.
+ * release the lock with the specific path.
*
- * <p> the segment related zookeeper node will be deleted.
+ * <p> the path related zookeeper node will be deleted.
*
- * @param segmentId the id of segment need to release the lock
+ * @param lockPath the zookeeper node path for the lock.
*
* @since 2.0
*/
@Override
- public void unlockWithName(String segmentId) {
- String lockPath = getLockPath(segmentId);
+ public void unlock(String lockPath) {
try {
- if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
- if (zkClient.checkExists().forPath(lockPath) != null) {
- zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
- logger.info("the lock for " + segmentId + " release successfully");
- } else {
- logger.info("the lock for " + segmentId + " has released");
- }
+ if (zkClient.checkExists().forPath(lockPath) != null) {
+ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
+ logger.info("the lock for " + lockPath + " release successfully");
+ } else {
+ logger.info("the lock for " + lockPath + " has released");
}
} catch (Exception e) {
- logger.error("error release lock :" + segmentId);
+ logger.error("error release lock :" + lockPath);
throw new RuntimeException(e);
}
}
/**
- * watching all the locked segments related zookeeper nodes change,
- * in order to when one job server is down, other job server can take over the running jobs.
+ * watch the path so that when zookeeper node change, the client could receive the notification.
+ * Note: the client should close the PathChildrenCache in time.
+ *
+ * @param watchPath the path need to watch
*
- * @param pool the threadPool watching the zookeeper node change
+ * @param watchExecutor the executor watching the zookeeper node change
*
- * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+ * @param watcherProcess do the concrete action with the node path and node data when zookeeper node changed
+ *
+ * @return PathChildrenCache the client should close the PathChildrenCache in time
*
* @since 2.0
*/
@Override
- public void watchLock(ExecutorService pool, final DoWatchLock doWatch) {
+ public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) {
+ PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true);
try {
- childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
- childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
+ cache.start();
+ cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_REMOVED:
- doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
+ watcherProcess.process(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
break;
default:
break;
}
}
- }, pool);
+ }, watchExecutor);
} catch (Exception e) {
logger.warn("watch the zookeeper node fail: " + e);
}
- }
-
- private String getLockPath(String pathName) {
- return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
- }
-
- private String getWatchPath() {
- return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
+ return cache;
}
@Override
@@ -253,7 +246,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
public void close() {
try {
- childrenCache.close();
zkClient.close();
} catch (Exception e) {
logger.error("error occurred to close PathChildrenCache", e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7bf7498..7315d1d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;