You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/26 03:36:10 UTC

[kylin] branch master updated: KYLIN-4249 Fix CI test

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 67151e2  KYLIN-4249 Fix CI test
67151e2 is described below

commit 67151e23f429d46c58215f90ffdee3ceb0b0b68e
Author: nichunen <ni...@apache.org>
AuthorDate: Thu Dec 26 11:34:32 2019 +0800

    KYLIN-4249 Fix CI test
---
 .../job/impl/threadpool/DistributedScheduler.java  | 245 +++++++++++----------
 1 file changed, 124 insertions(+), 121 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 4df9221..944e0b8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.SetThreadName;
@@ -60,22 +61,13 @@ import com.google.common.collect.Maps;
  *  2. add all the job servers and query servers to the kylin.server.cluster-servers
  */
 public class DistributedScheduler implements Scheduler<AbstractExecutable> {
-    private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
-
     public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata
-
-    public static DistributedScheduler getInstance(KylinConfig config) {
-        return config.getManager(DistributedScheduler.class);
-    }
-
-    // called by reflection
-    static DistributedScheduler newInstance(KylinConfig config) throws IOException {
-        return new DistributedScheduler();
-    }
+    private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
+    //keep all running job
+    private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>();
+    private ExecutableManager executableManager;
 
     // ============================================================================
-
-    private ExecutableManager executableManager;
     private FetcherRunner fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService watchPool;
@@ -83,109 +75,34 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
     private DefaultContext context;
     private DistributedLock jobLock;
     private Closeable lockWatch;
-
-    //keep all running job
-    private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>();
     private volatile boolean initialized = false;
     private volatile boolean hasStarted = false;
     private JobEngineConfig jobEngineConfig;
     private String serverName;
 
-    private class JobRunner implements Runnable {
-
-        private final AbstractExecutable executable;
-
-        public JobRunner(AbstractExecutable executable) {
-            this.executable = executable;
-        }
-
-        @Override
-        public void run() {
-            try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
-                    System.identityHashCode(DistributedScheduler.this), executable.getId())) {
-
-                KylinConfig config = executable.getCubeSpecificConfig();
-                boolean isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(),
-                        ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress());
-                logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId()
-                        + (isAssigned ? " is " : " is not ") + "assigned on this server : " + ToolUtil.getHostName());
-                if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
-                    logger.info(executable.toString() + " scheduled in server: " + serverName);
-
-                    context.addRunningJob(executable);
-                    jobWithLocks.add(executable.getId());
-                    executable.execute(context);
-                }
-            } catch (ExecuteException e) {
-                logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
-            } catch (Exception e) {
-                logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e);
-            } finally {
-                context.removeRunningJob(executable);
-                releaseJobLock(executable);
-                // trigger the next step asap
-                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
-            }
-        }
-
-        //release job lock when job state is ready or running and the job server keep the cube lock.
-        private void releaseJobLock(AbstractExecutable executable) {
-            if (executable instanceof DefaultChainedExecutable) {
-                ExecutableState state = executable.getStatus();
-
-                if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
-                    if (jobWithLocks.contains(executable.getId())) {
-                        logger.info(
-                                executable.toString() + " will release the lock for the job: " + executable.getId());
-                        jobLock.unlock(getLockPath(executable.getId()));
-                        jobWithLocks.remove(executable.getId());
-                    }
-                }
-            }
-        }
+    public static DistributedScheduler getInstance(KylinConfig config) {
+        return config.getManager(DistributedScheduler.class);
     }
 
-    //when the job lock released but the related job still running, resume the job.
-    private class WatcherProcessImpl implements DistributedLock.Watcher {
-        private String serverName;
-
-        public WatcherProcessImpl(String serverName) {
-            this.serverName = serverName;
-        }
-
-        @Override
-        public void onUnlock(String path, String nodeData) {
-            String[] paths = StringUtil.split(path, "/");
-            String jobId = paths[paths.length - 1];
+    // called by reflection
+    static DistributedScheduler newInstance(KylinConfig config) throws IOException {
+        return new DistributedScheduler();
+    }
 
-            // Sync execute cache in case broadcast not available
-            try {
-                executableManager.syncDigestsOfJob(jobId);
-            } catch (PersistentException e) {
-                logger.error("Failed to sync cache of job: " + jobId + ", at server: " + serverName);
-            }
+    public static String getLockPath(String pathName) {
+        return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + pathName);
+    }
 
-            final Output output = executableManager.getOutput(jobId);
-            if (output.getState() == ExecutableState.RUNNING) {
-                AbstractExecutable executable = executableManager.getJob(jobId);
-                if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) {
-                    try {
-                        logger.warn(nodeData + " has released the lock for: " + jobId
-                                + " but the job still running. so " + serverName + " resume the job");
-                        if (!jobLock.isLocked(getLockPath(jobId))) {
-                            executableManager.resumeRunningJobForce(executable.getId());
-                            fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
-                        }
-                    } catch (Exception e) {
-                        logger.error("resume the job but fail in server: " + serverName, e);
-                    }
-                }
-            }
-        }
+    private static String getWatchPath() {
+        return dropDoubleSlash(ZOOKEEPER_LOCK_PATH);
+    }
 
-        @Override
-        public void onLock(String lockPath, String client) {
+    public static String dropDoubleSlash(String path) {
+        for (int n = Integer.MAX_VALUE; n > path.length();) {
+            n = path.length();
+            path = path.replace("//", "/");
         }
+        return path;
     }
 
     @Override
@@ -255,22 +172,6 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
         }
     }
 
-    public static String getLockPath(String pathName) {
-        return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + pathName);
-    }
-
-    private static String getWatchPath() {
-        return dropDoubleSlash(ZOOKEEPER_LOCK_PATH);
-    }
-
-    public static String dropDoubleSlash(String path) {
-        for (int n = Integer.MAX_VALUE; n > path.length();) {
-            n = path.length();
-            path = path.replace("//", "/");
-        }
-        return path;
-    }
-
     @Override
     public void shutdown() throws SchedulerException {
         logger.info("Will shut down Job Engine ....");
@@ -301,4 +202,106 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
     public boolean hasStarted() {
         return this.hasStarted;
     }
+
+    private class JobRunner implements Runnable {
+
+        private final AbstractExecutable executable;
+
+        public JobRunner(AbstractExecutable executable) {
+            this.executable = executable;
+        }
+
+        @Override
+        public void run() {
+            try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
+                    System.identityHashCode(DistributedScheduler.this), executable.getId())) {
+
+                boolean isAssigned = true;
+                if (!StringUtils.isEmpty(executable.getCubeName())) {
+                    KylinConfig config = executable.getCubeSpecificConfig();
+                    isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(),
+                            ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress());
+                    logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId()
+                            + (isAssigned ? " is " : " is not ") + "assigned on this server : "
+                            + ToolUtil.getHostName());
+                }
+
+                if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
+                    logger.info(executable.toString() + " scheduled in server: " + serverName);
+
+                    context.addRunningJob(executable);
+                    jobWithLocks.add(executable.getId());
+                    executable.execute(context);
+                }
+            } catch (ExecuteException e) {
+                logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
+            } catch (Exception e) {
+                logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e);
+            } finally {
+                context.removeRunningJob(executable);
+                releaseJobLock(executable);
+                // trigger the next step asap
+                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+            }
+        }
+
+        //release job lock when job state is ready or running and the job server keep the cube lock.
+        private void releaseJobLock(AbstractExecutable executable) {
+            if (executable instanceof DefaultChainedExecutable) {
+                ExecutableState state = executable.getStatus();
+
+                if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
+                    if (jobWithLocks.contains(executable.getId())) {
+                        logger.info(
+                                executable.toString() + " will release the lock for the job: " + executable.getId());
+                        jobLock.unlock(getLockPath(executable.getId()));
+                        jobWithLocks.remove(executable.getId());
+                    }
+                }
+            }
+        }
+    }
+
+    //when the job lock released but the related job still running, resume the job.
+    private class WatcherProcessImpl implements DistributedLock.Watcher {
+        private String serverName;
+
+        public WatcherProcessImpl(String serverName) {
+            this.serverName = serverName;
+        }
+
+        @Override
+        public void onUnlock(String path, String nodeData) {
+            String[] paths = StringUtil.split(path, "/");
+            String jobId = paths[paths.length - 1];
+
+            // Sync execute cache in case broadcast not available
+            try {
+                executableManager.syncDigestsOfJob(jobId);
+            } catch (PersistentException e) {
+                logger.error("Failed to sync cache of job: " + jobId + ", at server: " + serverName);
+            }
+
+            final Output output = executableManager.getOutput(jobId);
+            if (output.getState() == ExecutableState.RUNNING) {
+                AbstractExecutable executable = executableManager.getJob(jobId);
+                if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) {
+                    try {
+                        logger.warn(nodeData + " has released the lock for: " + jobId
+                                + " but the job still running. so " + serverName + " resume the job");
+                        if (!jobLock.isLocked(getLockPath(jobId))) {
+                            executableManager.resumeRunningJobForce(executable.getId());
+                            fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+                        }
+                    } catch (Exception e) {
+                        logger.error("resume the job but fail in server: " + serverName, e);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void onLock(String lockPath, String client) {
+        }
+    }
 }