You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:55 UTC
[kylin] 08/11: KYLIN-4348 Fix distributed concurrency lock bug
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 17ae024611313ed254dc6dc3cc9a06540db013a4
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 15:14:10 2020 +0800
KYLIN-4348 Fix distributed concurrency lock bug
---
.../lock/zookeeper/ZookeeperDistributedLock.java | 2 +-
.../org/apache/kylin/rest/service/JobService.java | 26 ++-
.../kylin/source/hive/CreateMrHiveDictStep.java | 195 ++++++++++++++++-----
.../apache/kylin/source/hive/HiveInputBase.java | 3 +-
.../apache/kylin/source/hive/MRHiveDictUtil.java | 15 +-
5 files changed, 191 insertions(+), 50 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
index 298be56..36c0ad1 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
@@ -144,7 +144,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
try {
curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lockPath, clientBytes);
} catch (KeeperException.NodeExistsException ex) {
- logger.debug("{} see {} is already locked", client, lockPath);
+ logger.debug("{} check {} is already locked", client, lockPath);
} catch (Exception ex) {
throw new IllegalStateException("Error while " + client + " trying to lock " + lockPath, ex);
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index a6c4eea..d1ec4cf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -79,6 +79,7 @@ import org.apache.kylin.source.ISource;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
+import org.apache.kylin.source.hive.MRHiveDictUtil;
import org.apache.kylin.stream.coordinator.Coordinator;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.slf4j.Logger;
@@ -643,15 +644,26 @@ public class JobService extends BasicService implements InitializingBean {
if (executable.getStatus().isFinalState()) {
try {
DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+ if (lock.isLocked(MRHiveDictUtil.getLockPath(executable.getCubeName(), job.getId()))) {//release mr/hive global dict lock if exists
+ lock.purgeLocks(MRHiveDictUtil.getLockPath(executable.getCubeName(), null));
+ logger.info("{} unlock global MR/Hive dict lock path({}) success", job.getId(),
+ MRHiveDictUtil.getLockPath(executable.getCubeName(), null));
+ if (lock.isLocked(MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()))) {//release mr/hive global dict Ephemeral lock if exists
+ lock.purgeLocks(MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()));
+ logger.info("{} unlock global MR/Hive dict ephemeral lock path({}) success", job.getId(),
+ MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()));
+ }
+ }
+
if(lock.isLocked(CubeJobLockUtil.getLockPath(executable.getCubeName(), job.getId()))){//release cube job dict lock if exists
- lock.purgeLocks(CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
- logger.info("{} unlock cube job dict lock path({}) success", job.getId(), CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
+ lock.purgeLocks(CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
+ logger.info("{} unlock cube job dict lock path({}) success", job.getId(), CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
- if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()))) {//release cube job Ephemeral lock if exists
- lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
- logger.info("{} unlock cube job ephemeral lock path({}) success", job.getId(), CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
+ if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()))) {//release cube job Ephemeral lock if exists
+ lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
+ logger.info("{} unlock cube job ephemeral lock path({}) success", job.getId(), CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
+ }
}
- }
}catch (Exception e){
logger.error("get some error when release cube {} job {} job id {} " , executable.getCubeName(), job.getName(), job.getId());
}
@@ -725,7 +737,7 @@ public class JobService extends BasicService implements InitializingBean {
+ SecurityContextHolder.getContext().getAuthentication().getName());
if (job.getStatus().isComplete()) {
throw new IllegalStateException(
- "The job " + job.getId() + " has already been finished and cannot be stopped.");
+ "The job " + job.getId() + " has already been finished and cannot be stopped.");
}
getExecutableManager().pauseJob(job.getId());
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index f3bc4c7..305cdae 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -44,6 +44,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
*
@@ -52,31 +54,16 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(CreateMrHiveDictStep.class);
private final PatternedLogger stepLogger = new PatternedLogger(logger);
- private DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+ private final Lock threadLock = new ReentrantLock();
private static final String GET_SQL = "\" Get Max Dict Value Sql : \"";
- protected void createMrHiveDict(KylinConfig config) throws Exception {
+ protected void createMrHiveDict(KylinConfig config, DistributedLock lock) throws Exception {
+ logger.info("start to run createMrHiveDict {}", getId());
try {
if (getIsLock()) {
- String pathName = getLockPathName();
- if (Strings.isNullOrEmpty(pathName)) {
- throw new IllegalArgumentException("create Mr-Hive dict lock path name is null");
- }
- String lockPath = getLockPath(pathName);
- boolean isLocked = true;
- long lockStartTime = System.currentTimeMillis();
- while (isLocked) {
- isLocked = lock.isLocked(lockPath);
- stepLogger.log("zookeeper lock path :" + lockPath + ", result is " + isLocked);
- if (!isLocked) {
- break;
- }
- // wait 1 min and try again
- Thread.sleep(60000);
- }
- stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s");
- lock.lock(lockPath);
+ getLock(lock);
}
+
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(getName());
hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
hiveCmdBuilder.addStatement(getInitStatement());
@@ -124,7 +111,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
final String cmd = hiveCmdBuilder.toString();
- stepLogger.log("MR-Hive dict, cmd: " + cmd);
+ stepLogger.log("MR/Hive dict, cmd: " + cmd);
CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = manager.getCube(getCubeName());
@@ -134,23 +121,16 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
} else {
Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
if (response.getFirst() != 0) {
- throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst());
+ throw new RuntimeException("Failed to create MR/Hive dict, error code " + response.getFirst());
}
getManager().addJobInfo(getId(), stepLogger.getInfo());
}
- if (getIsLock()) {
- String pathName = getLockPathName();
- if (Strings.isNullOrEmpty(pathName)) {
- throw new IllegalArgumentException(" create mr hive dict unlock path name is null");
- }
- lock.unlock(getLockPath(pathName));
- stepLogger.log("zookeeper unlock path :" + getLockPathName());
+
+ if (getIsUnlock()) {
+ unLock(lock);
}
+ getManager().addJobInfo(getId(), stepLogger.getInfo());
} catch (Exception e) {
- if (getIsLock()) {
- lock.unlock(getLockPath(getLockPathName()));
- stepLogger.log("zookeeper unlock path :" + getLockPathName());
- }
logger.error("", e);
throw e;
}
@@ -167,25 +147,42 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
+ DistributedLock lock = null;
try {
+ if (getIsLock() || getIsUnlock()) {
+ lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+ }
String preHdfsShell = getPreHdfsShell();
if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) {
doRetry(preHdfsShell, config);
}
- createMrHiveDict(config);
+ createMrHiveDict(config, lock);
String postfixHdfsCmd = getPostfixHdfsShell();
if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) {
doRetry(postfixHdfsCmd, config);
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
-
+ if (isDiscarded()) {
+ if (getIsLock()) {
+ unLock(lock);
+ }
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog());
+ } else {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+ }
} catch (Exception e) {
logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ if (isDiscarded()) {
+ if (getIsLock()) {
+ unLock(lock);
+ }
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog());
+ } else {
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ }
}
}
@@ -267,6 +264,14 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock);
}
+ public void setJobFlowJobId(String jobId) {
+ setParam("jobFlowJobId", jobId);
+ }
+
+ public String getJobFlowJobId() {
+ return getParam("jobFlowJobId");
+ }
+
public void setIsUnLock(Boolean isUnLock) {
setParam("isUnLock", String.valueOf(isUnLock));
}
@@ -284,6 +289,119 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
return getParam("lockPathName");
}
+ private String getMRDictLockPathName() {
+ String pathName = getLockPathName();
+ if (Strings.isNullOrEmpty(pathName)) {
+ throw new IllegalArgumentException(" create MR/Hive dict lock path name is null");
+ }
+
+ String flowJobId = getJobFlowJobId();
+ if (Strings.isNullOrEmpty(flowJobId)) {
+ throw new IllegalArgumentException(" create MR/Hive dict lock path flowJobId is null");
+ }
+ return MRHiveDictUtil.getLockPath(pathName, flowJobId);
+ }
+
+ private String getMRDictLockParentPathName() {
+ String pathName = getLockPathName();
+ if (Strings.isNullOrEmpty(pathName)) {
+ throw new IllegalArgumentException(" create MR/Hive dict lock path name is null");
+ }
+ return MRHiveDictUtil.getLockPath(pathName, null);
+ }
+
+ private String getEphemeralLockPathName() {
+ String pathName = getLockPathName();
+ if (Strings.isNullOrEmpty(pathName)) {
+ throw new IllegalArgumentException(" create MR/Hive dict lock path name is null");
+ }
+
+ return MRHiveDictUtil.getEphemeralLockPath(pathName);
+ }
+
+ private void getLock(DistributedLock lock) throws InterruptedException {
+ logger.info("{} try to get global MR/Hive ZK lock", getId());
+ String ephemeralLockPath = getEphemeralLockPathName();
+ String fullLockPath = getMRDictLockPathName();
+ boolean isLocked = true;
+ boolean getLocked = false;
+ long lockStartTime = System.currentTimeMillis();
+
+ boolean isLockedByTheJob = lock.isLocked(fullLockPath);
+ logger.info("{} global MR/Hive ZK lock is isLockedByTheJob:{}", getId(), isLockedByTheJob);
+ if (!isLockedByTheJob) {
+ while (isLocked) {
+ isLocked = lock.isLocked(getMRDictLockParentPathName());//other job global lock
+
+ if (!isLocked) {
+ isLocked = lock.isLocked(ephemeralLockPath);//get the ephemeral current lock
+ stepLogger.log("zookeeper lock path :" + ephemeralLockPath + ", result is " + isLocked);
+ logger.info("zookeeper lock path :{}, is locked by other job result is {}", ephemeralLockPath,
+ isLocked);
+
+ if (!isLocked) {
+ //try to get ephemeral lock
+ try {
+ logger.debug("{} before start to get lock ephemeralLockPath {}", getId(), ephemeralLockPath);
+ threadLock.lock();
+ logger.debug("{} start to get lock ephemeralLockPath {}", getId(), ephemeralLockPath);
+ getLocked = lock.lock(ephemeralLockPath);
+ logger.debug("{} finish get lock ephemeralLockPath {},getLocked {}", getId(), ephemeralLockPath, getLocked);
+ } finally {
+ threadLock.unlock();
+ logger.debug("{} finish unlock the thread lock ,ephemeralLockPath {} ", getId(), ephemeralLockPath);
+ }
+
+ if (getLocked) {//get ephemeral lock success
+ try {
+ getLocked = lock.globalPermanentLock(fullLockPath);//add the fullLockPath lock in case of the server crash then the other can the same job can get the lock
+ if (getLocked) {
+ break;
+ } else {
+ if (lock.isLocked(ephemeralLockPath)) {
+ lock.unlock(ephemeralLockPath);
+ }
+ }
+ } catch (Exception e) {
+ if (lock.isLocked(ephemeralLockPath)) {
+ lock.unlock(ephemeralLockPath);
+ }
+ }
+ }
+ isLocked = true;//get lock fail,will try again
+ }
+ }
+ // wait 1 min and try again
+ logger.info(
+ "{},global parent lock path({}) is locked by other job result is {} ,ephemeral lock path :{} is locked by other job result is {},will try after one minute",
+ getId(), getMRDictLockParentPathName(), isLocked, ephemeralLockPath, isLocked);
+ Thread.sleep(60000);
+ }
+ } else {
+ lock.lock(ephemeralLockPath);
+ }
+ stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s");
+ long useSec = ((System.currentTimeMillis() - lockStartTime) / 1000);
+ logger.info("job {} get zookeeper lock path:{} success,zookeeper get lock costTime : {} s", getId(),
+ fullLockPath, useSec);
+ }
+
+ private void unLock(DistributedLock lock) {
+ String parentLockPath = getMRDictLockParentPathName();
+ String ephemeralLockPath = getEphemeralLockPathName();
+ if (lock.isLocked(getMRDictLockPathName())) {
+ lock.purgeLocks(parentLockPath);
+ stepLogger.log("zookeeper unlock path :" + parentLockPath);
+ logger.info("{} unlock full lock path :{} success", getId(), parentLockPath);
+ }
+
+ if (lock.isLocked(ephemeralLockPath)) {
+ lock.purgeLocks(ephemeralLockPath);
+ stepLogger.log("zookeeper unlock path :" + ephemeralLockPath);
+ logger.info("{} unlock full lock path :{} success", getId(), ephemeralLockPath);
+ }
+ }
+
private static String serilizeToMap(Map<String, String> map) {
JSONArray result = new JSONArray();
if (map != null && map.size() > 0) {
@@ -322,7 +440,4 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
return result;
}
- private String getLockPath(String pathName) {
- return MRHiveDictUtil.DictHiveType.MrDictLockPath.getName() + pathName;
- }
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 624c8f9..49e3f8d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -208,6 +208,7 @@ public class HiveInputBase {
step.setIsLock(true);
step.setIsUnLock(false);
step.setLockPathName(cubeName);
+ step.setJobFlowJobId(jobId);
return step;
}
@@ -305,7 +306,7 @@ public class HiveInputBase {
step.setCreateTableStatementMap(dictHqlMap);
step.setIsUnLock(true);
step.setLockPathName(cubeName);
- //toDo Fix distributed concurrency lock bug
+ step.setJobFlowJobId(jobId);
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
return step;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index fd2d103..85cd855 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -46,7 +46,8 @@ public class MRHiveDictUtil {
protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
public enum DictHiveType {
- GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/");
+ GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/"), MrEphemeralDictLockPath(
+ "/mr_dict_ephemeral_lock/");
private String name;
DictHiveType(String name) {
@@ -181,6 +182,18 @@ public class MRHiveDictUtil {
executableManager.addJobInfo(jobId, info);
}
+ public static String getLockPath(String cubeName, String jobId) {
+ if (jobId == null) {
+ return DictHiveType.MrDictLockPath.getName() + cubeName;
+ } else {
+ return DictHiveType.MrDictLockPath.getName() + cubeName + "/" + jobId;
+ }
+ }
+
+ public static String getEphemeralLockPath(String cubeName) {
+ return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName;
+ }
+
public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
}