You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:55 UTC

[kylin] 08/11: KYLIN-4348 Fix distributed concurrency lock bug

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 17ae024611313ed254dc6dc3cc9a06540db013a4
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 15:14:10 2020 +0800

    KYLIN-4348 Fix distributed concurrency lock bug
---
 .../lock/zookeeper/ZookeeperDistributedLock.java   |   2 +-
 .../org/apache/kylin/rest/service/JobService.java  |  26 ++-
 .../kylin/source/hive/CreateMrHiveDictStep.java    | 195 ++++++++++++++++-----
 .../apache/kylin/source/hive/HiveInputBase.java    |   3 +-
 .../apache/kylin/source/hive/MRHiveDictUtil.java   |  15 +-
 5 files changed, 191 insertions(+), 50 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
index 298be56..36c0ad1 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
@@ -144,7 +144,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
         try {
             curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lockPath, clientBytes);
         } catch (KeeperException.NodeExistsException ex) {
-            logger.debug("{} see {} is already locked", client, lockPath);
+            logger.debug("{} check {} is already locked", client, lockPath);
         } catch (Exception ex) {
             throw new IllegalStateException("Error while " + client + " trying to lock " + lockPath, ex);
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index a6c4eea..d1ec4cf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -79,6 +79,7 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
+import org.apache.kylin.source.hive.MRHiveDictUtil;
 import org.apache.kylin.stream.coordinator.Coordinator;
 import org.apache.kylin.stream.core.model.SegmentBuildState;
 import org.slf4j.Logger;
@@ -643,15 +644,26 @@ public class JobService extends BasicService implements InitializingBean {
                 if (executable.getStatus().isFinalState()) {
                     try {
                         DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+                        if (lock.isLocked(MRHiveDictUtil.getLockPath(executable.getCubeName(), job.getId()))) {//release mr/hive global dict lock if exists
+                            lock.purgeLocks(MRHiveDictUtil.getLockPath(executable.getCubeName(), null));
+                            logger.info("{} unlock global MR/Hive dict lock path({}) success", job.getId(),
+                                    MRHiveDictUtil.getLockPath(executable.getCubeName(), null));
+                            if (lock.isLocked(MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()))) {//release mr/hive global dict Ephemeral lock if exists
+                                lock.purgeLocks(MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()));
+                                logger.info("{} unlock global MR/Hive dict ephemeral lock path({}) success", job.getId(),
+                                        MRHiveDictUtil.getEphemeralLockPath(executable.getCubeName()));
+                            }
+                        }
+
                         if(lock.isLocked(CubeJobLockUtil.getLockPath(executable.getCubeName(), job.getId()))){//release cube job dict lock if exists
-                            lock.purgeLocks(CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
-                            logger.info("{} unlock cube job dict lock path({}) success", job.getId(), CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
+                                lock.purgeLocks(CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
+                                logger.info("{} unlock cube job dict lock path({}) success", job.getId(), CubeJobLockUtil.getLockPath(executable.getCubeName(), null));
 
-                            if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()))) {//release cube job Ephemeral lock if exists
-                                lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
-                                logger.info("{} unlock cube job ephemeral lock path({}) success", job.getId(), CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
+                                if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()))) {//release cube job Ephemeral lock if exists
+                                    lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
+                                    logger.info("{} unlock cube job ephemeral lock path({}) success", job.getId(), CubeJobLockUtil.getEphemeralLockPath(executable.getCubeName()));
+                                }
                             }
-                        }
                     }catch (Exception e){
                         logger.error("get some error when release cube {} job {} job id {} " , executable.getCubeName(), job.getName(), job.getId());
                     }
@@ -725,7 +737,7 @@ public class JobService extends BasicService implements InitializingBean {
             + SecurityContextHolder.getContext().getAuthentication().getName());
         if (job.getStatus().isComplete()) {
           throw new IllegalStateException(
-              "The job " + job.getId() + " has already been finished and cannot be stopped.");
+                  "The job " + job.getId() + " has already been finished and cannot be stopped.");
         }
         getExecutableManager().pauseJob(job.getId());
     }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index f3bc4c7..305cdae 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -44,6 +44,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  *
@@ -52,31 +54,16 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
 
     private static final Logger logger = LoggerFactory.getLogger(CreateMrHiveDictStep.class);
     private final PatternedLogger stepLogger = new PatternedLogger(logger);
-    private DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+    private final Lock threadLock = new ReentrantLock();
     private static final String GET_SQL = "\" Get Max Dict Value Sql : \"";
 
-    protected void createMrHiveDict(KylinConfig config) throws Exception {
+    protected void createMrHiveDict(KylinConfig config, DistributedLock lock) throws Exception {
+        logger.info("start to run createMrHiveDict {}", getId());
         try {
             if (getIsLock()) {
-                String pathName = getLockPathName();
-                if (Strings.isNullOrEmpty(pathName)) {
-                    throw new IllegalArgumentException("create Mr-Hive dict lock path name is null");
-                }
-                String lockPath = getLockPath(pathName);
-                boolean isLocked = true;
-                long lockStartTime = System.currentTimeMillis();
-                while (isLocked) {
-                    isLocked = lock.isLocked(lockPath);
-                    stepLogger.log("zookeeper lock path :" + lockPath + ", result is " + isLocked);
-                    if (!isLocked) {
-                        break;
-                    }
-                    // wait 1 min and try again
-                    Thread.sleep(60000);
-                }
-                stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s");
-                lock.lock(lockPath);
+                getLock(lock);
             }
+
             final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(getName());
             hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
             hiveCmdBuilder.addStatement(getInitStatement());
@@ -124,7 +111,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
 
             final String cmd = hiveCmdBuilder.toString();
 
-            stepLogger.log("MR-Hive dict, cmd: " + cmd);
+            stepLogger.log("MR/Hive dict, cmd: " + cmd);
 
             CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = manager.getCube(getCubeName());
@@ -134,23 +121,16 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
             } else {
                 Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
                 if (response.getFirst() != 0) {
-                    throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst());
+                    throw new RuntimeException("Failed to create MR/Hive dict, error code " + response.getFirst());
                 }
                 getManager().addJobInfo(getId(), stepLogger.getInfo());
             }
-            if (getIsLock()) {
-                String pathName = getLockPathName();
-                if (Strings.isNullOrEmpty(pathName)) {
-                    throw new IllegalArgumentException(" create mr hive dict unlock path name is null");
-                }
-                lock.unlock(getLockPath(pathName));
-                stepLogger.log("zookeeper unlock path :" + getLockPathName());
+
+            if (getIsUnlock()) {
+                unLock(lock);
             }
+            getManager().addJobInfo(getId(), stepLogger.getInfo());
         } catch (Exception e) {
-            if (getIsLock()) {
-                lock.unlock(getLockPath(getLockPathName()));
-                stepLogger.log("zookeeper unlock path :" + getLockPathName());
-            }
             logger.error("", e);
             throw e;
         }
@@ -167,25 +147,42 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
+        DistributedLock lock = null;
         try {
+            if (getIsLock() || getIsUnlock()) {
+                lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+            }
 
             String preHdfsShell = getPreHdfsShell();
             if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) {
                 doRetry(preHdfsShell, config);
             }
 
-            createMrHiveDict(config);
+            createMrHiveDict(config, lock);
 
             String postfixHdfsCmd = getPostfixHdfsShell();
             if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) {
                 doRetry(postfixHdfsCmd, config);
             }
 
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
-
+            if (isDiscarded()) {
+                if (getIsLock()) {
+                    unLock(lock);
+                }
+                return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog());
+            } else {
+                return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+            }
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            if (isDiscarded()) {
+                if (getIsLock()) {
+                    unLock(lock);
+                }
+                return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog());
+            } else {
+                return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+            }
         }
     }
 
@@ -267,6 +264,14 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
         return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock);
     }
 
+    public void setJobFlowJobId(String jobId) {
+        setParam("jobFlowJobId", jobId);
+    }
+
+    public String getJobFlowJobId() {
+        return getParam("jobFlowJobId");
+    }
+
     public void setIsUnLock(Boolean isUnLock) {
         setParam("isUnLock", String.valueOf(isUnLock));
     }
@@ -284,6 +289,119 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
         return getParam("lockPathName");
     }
 
+    private String getMRDictLockPathName() {
+        String pathName = getLockPathName();
+        if (Strings.isNullOrEmpty(pathName)) {
+            throw new IllegalArgumentException(" create MR/Hive dict lock path name is null");
+        }
+
+        String flowJobId = getJobFlowJobId();
+        if (Strings.isNullOrEmpty(flowJobId)) {
+            throw new IllegalArgumentException(" create MR/Hive dict lock path flowJobId is null");
+        }
+        return MRHiveDictUtil.getLockPath(pathName, flowJobId);
+    }
+
+    private String getMRDictLockParentPathName() {
+        String pathName = getLockPathName();
+        if (Strings.isNullOrEmpty(pathName)) {
+            throw new IllegalArgumentException(" create MR/Hive dict lock path name is null");
+        }
+        return MRHiveDictUtil.getLockPath(pathName, null);
+    }
+
+    private String getEphemeralLockPathName() {
+        String pathName = getLockPathName();
+        if (Strings.isNullOrEmpty(pathName)) {
+            throw new IllegalArgumentException(" create MR/Hive dict lock path name is null");
+        }
+
+        return MRHiveDictUtil.getEphemeralLockPath(pathName);
+    }
+
+    private void getLock(DistributedLock lock) throws InterruptedException {
+        logger.info("{} try to get global MR/Hive ZK lock", getId());
+        String ephemeralLockPath = getEphemeralLockPathName();
+        String fullLockPath = getMRDictLockPathName();
+        boolean isLocked = true;
+        boolean getLocked = false;
+        long lockStartTime = System.currentTimeMillis();
+
+        boolean isLockedByTheJob = lock.isLocked(fullLockPath);
+        logger.info("{} global MR/Hive ZK lock is isLockedByTheJob:{}", getId(), isLockedByTheJob);
+        if (!isLockedByTheJob) {
+            while (isLocked) {
+                isLocked = lock.isLocked(getMRDictLockParentPathName());//other job global lock
+
+                if (!isLocked) {
+                    isLocked = lock.isLocked(ephemeralLockPath);//get the ephemeral current lock
+                    stepLogger.log("zookeeper lock path :" + ephemeralLockPath + ", result is " + isLocked);
+                    logger.info("zookeeper lock path :{}, is locked by other job result is {}", ephemeralLockPath,
+                            isLocked);
+
+                    if (!isLocked) {
+                        //try to get ephemeral lock
+                        try {
+                            logger.debug("{} before start to get lock ephemeralLockPath {}", getId(), ephemeralLockPath);
+                            threadLock.lock();
+                            logger.debug("{} start to get lock ephemeralLockPath {}", getId(), ephemeralLockPath);
+                            getLocked = lock.lock(ephemeralLockPath);
+                            logger.debug("{} finish get lock ephemeralLockPath {},getLocked {}", getId(), ephemeralLockPath, getLocked);
+                        } finally {
+                            threadLock.unlock();
+                            logger.debug("{} finish unlock the thread lock ,ephemeralLockPath {} ", getId(), ephemeralLockPath);
+                        }
+
+                        if (getLocked) {//get ephemeral lock success
+                            try {
+                                getLocked = lock.globalPermanentLock(fullLockPath);//add the fullLockPath lock in case of the server crash then the other can the same job can get the lock
+                                if (getLocked) {
+                                    break;
+                                } else {
+                                    if (lock.isLocked(ephemeralLockPath)) {
+                                        lock.unlock(ephemeralLockPath);
+                                    }
+                                }
+                            } catch (Exception e) {
+                                if (lock.isLocked(ephemeralLockPath)) {
+                                    lock.unlock(ephemeralLockPath);
+                                }
+                            }
+                        }
+                        isLocked = true;//get lock fail,will try again
+                    }
+                }
+                // wait 1 min and try again
+                logger.info(
+                        "{},global parent lock path({}) is locked by other job result is {} ,ephemeral lock path :{} is locked by other job result is {},will try after one minute",
+                        getId(), getMRDictLockParentPathName(), isLocked, ephemeralLockPath, isLocked);
+                Thread.sleep(60000);
+            }
+        } else {
+            lock.lock(ephemeralLockPath);
+        }
+        stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s");
+        long useSec = ((System.currentTimeMillis() - lockStartTime) / 1000);
+        logger.info("job {} get zookeeper lock path:{} success,zookeeper get lock costTime : {} s", getId(),
+                fullLockPath, useSec);
+    }
+
+    private void unLock(DistributedLock lock) {
+        String parentLockPath = getMRDictLockParentPathName();
+        String ephemeralLockPath = getEphemeralLockPathName();
+        if (lock.isLocked(getMRDictLockPathName())) {
+            lock.purgeLocks(parentLockPath);
+            stepLogger.log("zookeeper unlock path :" + parentLockPath);
+            logger.info("{} unlock full lock path :{} success", getId(), parentLockPath);
+        }
+
+        if (lock.isLocked(ephemeralLockPath)) {
+            lock.purgeLocks(ephemeralLockPath);
+            stepLogger.log("zookeeper unlock path :" + ephemeralLockPath);
+            logger.info("{} unlock full lock path :{} success", getId(), ephemeralLockPath);
+        }
+    }
+
     private static String serilizeToMap(Map<String, String> map) {
         JSONArray result = new JSONArray();
         if (map != null && map.size() > 0) {
@@ -322,7 +440,4 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
         return result;
     }
 
-    private String getLockPath(String pathName) {
-        return MRHiveDictUtil.DictHiveType.MrDictLockPath.getName() + pathName;
-    }
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 624c8f9..49e3f8d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -208,6 +208,7 @@ public class HiveInputBase {
             step.setIsLock(true);
             step.setIsUnLock(false);
             step.setLockPathName(cubeName);
+            step.setJobFlowJobId(jobId);
             return step;
         }
 
@@ -305,7 +306,7 @@ public class HiveInputBase {
             step.setCreateTableStatementMap(dictHqlMap);
             step.setIsUnLock(true);
             step.setLockPathName(cubeName);
-            //toDo Fix distributed concurrency lock bug
+            step.setJobFlowJobId(jobId);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
             return step;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index fd2d103..85cd855 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -46,7 +46,8 @@ public class MRHiveDictUtil {
     protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
 
     public enum DictHiveType {
-        GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/");
+        GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/"), MrEphemeralDictLockPath(
+                "/mr_dict_ephemeral_lock/");
         private String name;
 
         DictHiveType(String name) {
@@ -181,6 +182,18 @@ public class MRHiveDictUtil {
         executableManager.addJobInfo(jobId, info);
     }
 
+    public static String getLockPath(String cubeName, String jobId) {
+        if (jobId == null) {
+            return DictHiveType.MrDictLockPath.getName() + cubeName;
+        } else {
+            return DictHiveType.MrDictLockPath.getName() + cubeName + "/" + jobId;
+        }
+    }
+
+    public static String getEphemeralLockPath(String cubeName) {
+        return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName;
+    }
+
     public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
         return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
     }