You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/26 03:36:10 UTC
[kylin] branch master updated: KYLIN-4249 Fix CI test
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 67151e2 KYLIN-4249 Fix CI test
67151e2 is described below
commit 67151e23f429d46c58215f90ffdee3ceb0b0b68e
Author: nichunen <ni...@apache.org>
AuthorDate: Thu Dec 26 11:34:32 2019 +0800
KYLIN-4249 Fix CI test
---
.../job/impl/threadpool/DistributedScheduler.java | 245 +++++++++++----------
1 file changed, 124 insertions(+), 121 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 4df9221..944e0b8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.SetThreadName;
@@ -60,22 +61,13 @@ import com.google.common.collect.Maps;
* 2. add all the job servers and query servers to the kylin.server.cluster-servers
*/
public class DistributedScheduler implements Scheduler<AbstractExecutable> {
- private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
-
public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata
-
- public static DistributedScheduler getInstance(KylinConfig config) {
- return config.getManager(DistributedScheduler.class);
- }
-
- // called by reflection
- static DistributedScheduler newInstance(KylinConfig config) throws IOException {
- return new DistributedScheduler();
- }
+ private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
+ //keep all running job
+ private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>();
+ private ExecutableManager executableManager;
// ============================================================================
-
- private ExecutableManager executableManager;
private FetcherRunner fetcher;
private ScheduledExecutorService fetcherPool;
private ExecutorService watchPool;
@@ -83,109 +75,34 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
private DefaultContext context;
private DistributedLock jobLock;
private Closeable lockWatch;
-
- //keep all running job
- private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>();
private volatile boolean initialized = false;
private volatile boolean hasStarted = false;
private JobEngineConfig jobEngineConfig;
private String serverName;
- private class JobRunner implements Runnable {
-
- private final AbstractExecutable executable;
-
- public JobRunner(AbstractExecutable executable) {
- this.executable = executable;
- }
-
- @Override
- public void run() {
- try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
- System.identityHashCode(DistributedScheduler.this), executable.getId())) {
-
- KylinConfig config = executable.getCubeSpecificConfig();
- boolean isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(),
- ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress());
- logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId()
- + (isAssigned ? " is " : " is not ") + "assigned on this server : " + ToolUtil.getHostName());
- if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
- logger.info(executable.toString() + " scheduled in server: " + serverName);
-
- context.addRunningJob(executable);
- jobWithLocks.add(executable.getId());
- executable.execute(context);
- }
- } catch (ExecuteException e) {
- logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
- } catch (Exception e) {
- logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e);
- } finally {
- context.removeRunningJob(executable);
- releaseJobLock(executable);
- // trigger the next step asap
- fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
- }
- }
-
- //release job lock when job state is ready or running and the job server keep the cube lock.
- private void releaseJobLock(AbstractExecutable executable) {
- if (executable instanceof DefaultChainedExecutable) {
- ExecutableState state = executable.getStatus();
-
- if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
- if (jobWithLocks.contains(executable.getId())) {
- logger.info(
- executable.toString() + " will release the lock for the job: " + executable.getId());
- jobLock.unlock(getLockPath(executable.getId()));
- jobWithLocks.remove(executable.getId());
- }
- }
- }
- }
+ public static DistributedScheduler getInstance(KylinConfig config) {
+ return config.getManager(DistributedScheduler.class);
}
- //when the job lock released but the related job still running, resume the job.
- private class WatcherProcessImpl implements DistributedLock.Watcher {
- private String serverName;
-
- public WatcherProcessImpl(String serverName) {
- this.serverName = serverName;
- }
-
- @Override
- public void onUnlock(String path, String nodeData) {
- String[] paths = StringUtil.split(path, "/");
- String jobId = paths[paths.length - 1];
+ // called by reflection
+ static DistributedScheduler newInstance(KylinConfig config) throws IOException {
+ return new DistributedScheduler();
+ }
- // Sync execute cache in case broadcast not available
- try {
- executableManager.syncDigestsOfJob(jobId);
- } catch (PersistentException e) {
- logger.error("Failed to sync cache of job: " + jobId + ", at server: " + serverName);
- }
+ public static String getLockPath(String pathName) {
+ return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + pathName);
+ }
- final Output output = executableManager.getOutput(jobId);
- if (output.getState() == ExecutableState.RUNNING) {
- AbstractExecutable executable = executableManager.getJob(jobId);
- if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) {
- try {
- logger.warn(nodeData + " has released the lock for: " + jobId
- + " but the job still running. so " + serverName + " resume the job");
- if (!jobLock.isLocked(getLockPath(jobId))) {
- executableManager.resumeRunningJobForce(executable.getId());
- fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
- }
- } catch (Exception e) {
- logger.error("resume the job but fail in server: " + serverName, e);
- }
- }
- }
- }
+ private static String getWatchPath() {
+ return dropDoubleSlash(ZOOKEEPER_LOCK_PATH);
+ }
- @Override
- public void onLock(String lockPath, String client) {
+ public static String dropDoubleSlash(String path) {
+ for (int n = Integer.MAX_VALUE; n > path.length();) {
+ n = path.length();
+ path = path.replace("//", "/");
}
+ return path;
}
@Override
@@ -255,22 +172,6 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
}
}
- public static String getLockPath(String pathName) {
- return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + pathName);
- }
-
- private static String getWatchPath() {
- return dropDoubleSlash(ZOOKEEPER_LOCK_PATH);
- }
-
- public static String dropDoubleSlash(String path) {
- for (int n = Integer.MAX_VALUE; n > path.length();) {
- n = path.length();
- path = path.replace("//", "/");
- }
- return path;
- }
-
@Override
public void shutdown() throws SchedulerException {
logger.info("Will shut down Job Engine ....");
@@ -301,4 +202,106 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
public boolean hasStarted() {
return this.hasStarted;
}
+
+ private class JobRunner implements Runnable {
+
+ private final AbstractExecutable executable;
+
+ public JobRunner(AbstractExecutable executable) {
+ this.executable = executable;
+ }
+
+ @Override
+ public void run() {
+ try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
+ System.identityHashCode(DistributedScheduler.this), executable.getId())) {
+
+ boolean isAssigned = true;
+ if (!StringUtils.isEmpty(executable.getCubeName())) {
+ KylinConfig config = executable.getCubeSpecificConfig();
+ isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(),
+ ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress());
+ logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId()
+ + (isAssigned ? " is " : " is not ") + "assigned on this server : "
+ + ToolUtil.getHostName());
+ }
+
+ if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
+ logger.info(executable.toString() + " scheduled in server: " + serverName);
+
+ context.addRunningJob(executable);
+ jobWithLocks.add(executable.getId());
+ executable.execute(context);
+ }
+ } catch (ExecuteException e) {
+ logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
+ } catch (Exception e) {
+ logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e);
+ } finally {
+ context.removeRunningJob(executable);
+ releaseJobLock(executable);
+ // trigger the next step asap
+ fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+ }
+ }
+
+ //release job lock when job state is ready or running and the job server keep the cube lock.
+ private void releaseJobLock(AbstractExecutable executable) {
+ if (executable instanceof DefaultChainedExecutable) {
+ ExecutableState state = executable.getStatus();
+
+ if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
+ if (jobWithLocks.contains(executable.getId())) {
+ logger.info(
+ executable.toString() + " will release the lock for the job: " + executable.getId());
+ jobLock.unlock(getLockPath(executable.getId()));
+ jobWithLocks.remove(executable.getId());
+ }
+ }
+ }
+ }
+ }
+
+ //when the job lock released but the related job still running, resume the job.
+ private class WatcherProcessImpl implements DistributedLock.Watcher {
+ private String serverName;
+
+ public WatcherProcessImpl(String serverName) {
+ this.serverName = serverName;
+ }
+
+ @Override
+ public void onUnlock(String path, String nodeData) {
+ String[] paths = StringUtil.split(path, "/");
+ String jobId = paths[paths.length - 1];
+
+ // Sync execute cache in case broadcast not available
+ try {
+ executableManager.syncDigestsOfJob(jobId);
+ } catch (PersistentException e) {
+ logger.error("Failed to sync cache of job: " + jobId + ", at server: " + serverName);
+ }
+
+ final Output output = executableManager.getOutput(jobId);
+ if (output.getState() == ExecutableState.RUNNING) {
+ AbstractExecutable executable = executableManager.getJob(jobId);
+ if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) {
+ try {
+ logger.warn(nodeData + " has released the lock for: " + jobId
+ + " but the job still running. so " + serverName + " resume the job");
+ if (!jobLock.isLocked(getLockPath(jobId))) {
+ executableManager.resumeRunningJobForce(executable.getId());
+ fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ logger.error("resume the job but fail in server: " + serverName, e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onLock(String lockPath, String client) {
+ }
+ }
}