You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/04/16 11:43:28 UTC
[7/7] kylin git commit: KYLIN-2506 code review,
split the role of DistributedLock and JobLock
KYLIN-2506 code review, split the role of DistributedLock and JobLock
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/06bf2a16
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/06bf2a16
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/06bf2a16
Branch: refs/heads/master-KYLIN-2506
Commit: 06bf2a16234b323ddf66c65b4056a5412db46143
Parents: 796d80f
Author: Yang Li <li...@apache.org>
Authored: Sun Apr 16 19:42:54 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 19:42:54 2017 +0800
----------------------------------------------------------------------
core-common/pom.xml | 5 ---
.../apache/kylin/common/KylinConfigBase.java | 7 ++++
.../kylin/common/lock/DistributedJobLock.java | 38 --------------------
.../kylin/common/lock/DistributedLock.java | 37 +++++++++++++++++++
.../org/apache/kylin/common/lock/JobLock.java | 26 --------------
.../apache/kylin/common/lock/MockJobLock.java | 33 -----------------
.../kylin/dict/GlobalDictionaryBuilder.java | 28 +++++++--------
.../java/org/apache/kylin/job/Scheduler.java | 2 +-
.../job/impl/threadpool/DefaultScheduler.java | 10 +++---
.../impl/threadpool/DistributedScheduler.java | 27 +++++++-------
.../kylin/job/lock/DistributedJobLock.java | 24 +++++++++++++
.../java/org/apache/kylin/job/lock/JobLock.java | 30 ++++++++++++++++
.../org/apache/kylin/job/lock/MockJobLock.java | 33 +++++++++++++++++
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
.../test_case_data/localmeta/kylin.properties | 2 +-
.../test_case_data/sandbox/kylin.properties | 2 +-
.../kylin/job/BaseTestDistributedScheduler.java | 2 +-
.../apache/kylin/rest/service/JobService.java | 2 +-
.../hbase/util/ZookeeperDistributedJobLock.java | 17 +++++----
.../storage/hbase/util/ZookeeperJobLock.java | 6 ++--
20 files changed, 183 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 5b5f78b..95d3c29 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,11 +69,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4361242..bf6cdb8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.lock.DistributedLock;
+import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,6 +237,11 @@ abstract public class KylinConfigBase implements Serializable {
public Map<String, String> getCubeCustomMeasureTypes() {
return getPropertiesByPrefix("kylin.metadata.custom-measure-types.");
}
+
+ public DistributedLock getDistributedLock() {
+ String clsName = getOptional("kylin.metadata.distributed-lock-impl", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+ return (DistributedLock) ClassUtil.newInstance(clsName);
+ }
// ============================================================================
// DICTIONARY & SNAPSHOT
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
deleted file mode 100644
index 00d1ca4..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-
-import java.util.concurrent.Executor;
-
-public interface DistributedJobLock extends JobLock {
-
- boolean lockWithClient(String lockPath, String lockClient);
-
- boolean isHasLocked(String lockPath);
-
- void unlock(String lockPath);
-
- PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process);
-
- public interface WatcherProcess {
- void process(String path, String data);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
new file mode 100644
index 0000000..ead7714
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+import java.io.Closeable;
+import java.util.concurrent.Executor;
+
+public interface DistributedLock extends Closeable {
+
+ boolean lockPath(String lockPath, String lockClient);
+
+ boolean isPathLocked(String lockPath);
+
+ void unlockPath(String lockPath);
+
+ Closeable watchPath(String watchPath, Executor watchExecutor, Watcher process);
+
+ public interface Watcher {
+ void process(String path, String data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
deleted file mode 100644
index 5802d71..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-
-public interface JobLock {
- boolean lock();
-
- void unlock();
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
deleted file mode 100644
index f8233be..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public void unlock() {
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 9d66b12..f2ed375 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -18,19 +18,19 @@
package org.apache.kylin.dict;
+import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.DistributedJobLock;
-import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.Dictionary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
* GlobalDictinary mainly used for count distinct measure to support rollup among segments.
@@ -40,7 +40,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
private AppendTrieDictionaryBuilder builder;
private int baseId;
- private DistributedJobLock lock;
+ private DistributedLock lock;
private String sourceColumn;
//the job thread name is UUID+threadID
private final String jobUUID = Thread.currentThread().getName();
@@ -65,7 +65,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
@Override
public boolean addValue(String value) {
if (++counter % 1_000_000 == 0) {
- if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
logger.info("processed {} values", counter);
} else {
throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock");
@@ -89,7 +89,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
@Override
public Dictionary<String> build() throws IOException {
try {
- if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
return builder.build(baseId);
}
} finally {
@@ -99,17 +99,17 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
}
private void lock(final String sourceColumn) throws IOException {
- lock = (DistributedJobLock) ClassUtil.newInstance("org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+ lock = KylinConfig.getInstanceFromEnv().getDistributedLock();
- if (!lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (!lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn);
final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
- PathChildrenCache cache = lock.watch(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedJobLock.WatcherProcess() {
+ Closeable watch = lock.watchPath(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedLock.Watcher() {
@Override
public void process(String path, String data) {
- if (!data.equalsIgnoreCase(jobUUID) && lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (!data.equalsIgnoreCase(jobUUID) && lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
try {
bq.put("getLock");
} catch (InterruptedException e) {
@@ -126,7 +126,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
- cache.close();
+ watch.close();
}
logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn);
@@ -134,8 +134,8 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
}
private void checkAndUnlock() {
- if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
- lock.unlock(getLockPath(sourceColumn));
+ if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
+ lock.unlockPath(getLockPath(sourceColumn));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index e2cfd44..93d2510 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -21,7 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
/**
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 688708e..8b6b5aa 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,8 +187,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
@Override
- public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
- this.jobLock = jobLock;
+ public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {
+ jobLock = lock;
String serverMode = jobEngineConfig.getConfig().getServerMode();
if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
@@ -205,7 +205,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
this.jobEngineConfig = jobEngineConfig;
- if (jobLock.lock() == false) {
+ if (jobLock.lockJobEngine() == false) {
throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
}
@@ -226,7 +226,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
@Override
public void shutdown() throws SchedulerException {
logger.info("Shutingdown Job Engine ....");
- jobLock.unlock();
+ jobLock.unlockJobEngine();
fetcherPool.shutdown();
jobPool.shutdown();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index c5b03dc..d544320 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job.impl.threadpool;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -34,10 +35,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -50,8 +51,8 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.DistributedJobLock;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +73,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private ExecutorService watchPool;
private ExecutorService jobPool;
private DefaultContext context;
- private DistributedJobLock jobLock;
- private PathChildrenCache lockWatch;
+ private DistributedLock jobLock;
+ private Closeable lockWatch;
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -181,7 +182,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
public void run() {
try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) {
String segmentId = executable.getParam(SEGMENT_ID);
- if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) {
+ if (jobLock.lockPath(getLockPath(segmentId), serverName)) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
@@ -209,7 +210,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
if (segmentWithLocks.contains(segmentId)) {
logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
- jobLock.unlock(getLockPath(segmentId));
+ jobLock.unlockPath(getLockPath(segmentId));
segmentWithLocks.remove(segmentId);
}
}
@@ -218,7 +219,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
//when the segment lock released but the segment related job still running, resume the job.
- private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess {
+ private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedLock.Watcher {
private String serverName;
public WatcherProcessImpl(String serverName) {
@@ -237,7 +238,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
try {
logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
- if (!jobLock.isHasLocked(getLockPath(segmentId))) {
+ if (!jobLock.isPathLocked(getLockPath(segmentId))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
break;
@@ -264,7 +265,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
@Override
- public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
+ public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
String serverMode = jobEngineConfig.getConfig().getServerMode();
if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
logger.info("server mode: " + serverMode + ", no need to run job scheduler");
@@ -288,7 +289,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
//watch the zookeeper node change, so that when one job server is down, other job servers can take over.
watchPool = Executors.newFixedThreadPool(1);
WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
- lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess);
+ lockWatch = this.jobLock.watchPath(getWatchPath(), watchPool, watcherProcess);
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
@@ -307,7 +308,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
AbstractExecutable executable = executableManager.getJob(id);
if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
try {
- if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) {
+ if (!jobLock.isPathLocked(executable.getParam(SEGMENT_ID))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
}
@@ -348,7 +349,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private void releaseAllLocks() {
for (String segmentId : segmentWithLocks) {
- jobLock.unlock(getLockPath(segmentId));
+ jobLock.unlockPath(getLockPath(segmentId));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
new file mode 100644
index 0000000..e5e2a1e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+import org.apache.kylin.common.lock.DistributedLock;
+
+public interface DistributedJobLock extends JobLock, DistributedLock {
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
new file mode 100644
index 0000000..1b6b29e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+/**
+ * Among a Kylin cluster, usually only one node runs as the job engine and does the scheduling of build jobs.
+ * This interface is for such negotiation.
+ */
+public interface JobLock {
+
+ boolean lockJobEngine();
+
+ void unlockJobEngine();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
new file mode 100644
index 0000000..73f6192
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+ @Override
+ public boolean lockJobEngine() {
+ return true;
+ }
+
+ @Override
+ public void unlockJobEngine() {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1bafa34..1ada9a1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.common.lock.MockJobLock;
+import org.apache.kylin.job.lock.MockJobLock;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 3866575..969e466 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin
kylin.test.bcc.new.key=some-value
kylin.engine.mr.config-override.test1=test1
kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index c0a4968..684b4dd 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF
kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
# for test
-kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
kylin.engine.mr.uhc-reducer-count=3
### CUBE ###
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 4877ca1..b4ac42f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
- return jobLock.lockWithClient(getLockPath(cubeName), serverName);
+ return jobLock.lockPath(getLockPath(cubeName), serverName);
}
private static void initZk() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 31d1ded..4ba426e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.constant.Constant;
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 5f5a721..eb01e4b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -32,7 +32,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DistributedJobLock;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +40,9 @@ import org.slf4j.LoggerFactory;
public class ZookeeperDistributedJobLock implements DistributedJobLock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+ @SuppressWarnings("unused")
private final KylinConfig config;
+
private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
private final CuratorFramework zkClient;
@@ -100,7 +102,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public boolean lockWithClient(String lockPath, String lockClient) {
+ public boolean lockPath(String lockPath, String lockClient) {
logger.info(lockClient + " start lock the path: " + lockPath);
boolean hasLock = false;
@@ -163,7 +165,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public boolean isHasLocked(String lockPath) {
+ public boolean isPathLocked(String lockPath) {
try {
return zkClient.checkExists().forPath(lockPath) != null;
} catch (Exception e) {
@@ -183,7 +185,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public void unlock(String lockPath) {
+ public void unlockPath(String lockPath) {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
@@ -213,7 +215,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) {
+ public PathChildrenCache watchPath(String watchPath, Executor watchExecutor, final Watcher watcherProcess) {
PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true);
try {
cache.start();
@@ -236,14 +238,15 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}
@Override
- public boolean lock() {
+ public boolean lockJobEngine() {
return true;
}
@Override
- public void unlock() {
+ public void unlockJobEngine() {
}
+ @Override
public void close() {
try {
zkClient.close();
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7315d1d..6a3cf7e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -56,7 +56,7 @@ public class ZookeeperJobLock implements JobLock {
private CuratorFramework zkClient;
@Override
- public boolean lock() {
+ public boolean lockJobEngine() {
this.scheduleID = schedulerId();
String zkConnectString = getZKConnectString();
logger.info("zk connection string:" + zkConnectString);
@@ -100,7 +100,7 @@ public class ZookeeperJobLock implements JobLock {
}
@Override
- public void unlock() {
+ public void unlockJobEngine() {
releaseLock();
}