You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:03:06 UTC
[25/43] kylin git commit: KYLIN-1189 resume running jobs when job
engine failover
KYLIN-1189 resume running jobs when job engine failover
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0d7f9e66
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0d7f9e66
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0d7f9e66
Branch: refs/heads/helix-rebase
Commit: 0d7f9e666903a3f226eb877240e87e4942d8a9fd
Parents: 3b5260a
Author: shaofengshi <sh...@apache.org>
Authored: Thu Dec 3 10:32:40 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Mar 4 09:52:19 2016 +0800
----------------------------------------------------------------------
.../job/impl/threadpool/DefaultScheduler.java | 7 +-----
.../kylin/job/manager/ExecutableManager.java | 16 +++++++++++++
.../kylin/rest/service/CacheServiceTest.java | 25 ++++++++++++++++++++
3 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0d7f9e66/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 417e279..2915c60 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -170,12 +170,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
- for (AbstractExecutable executable : executableManager.getAllExecutables()) {
- if (executable.getStatus() == ExecutableState.READY) {
- executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
- }
- }
- executableManager.updateAllRunningJobsToError();
+ executableManager.resumeAllRunningJobs();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0d7f9e66/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index 3effbe7..4d03389 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -207,6 +207,7 @@ public class ExecutableManager {
}
}
+ @Deprecated
public void updateAllRunningJobsToError() {
try {
final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
@@ -222,6 +223,21 @@ public class ExecutableManager {
}
}
+ public void resumeAllRunningJobs() {
+ try {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+ for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+ if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
+ executableOutputPO.setStatus(ExecutableState.READY.toString());
+ executableDao.updateJobOutput(executableOutputPO);
+ }
+ }
+ } catch (PersistentException e) {
+ logger.error("error reset job status from RUNNING to READY", e);
+ throw new RuntimeException(e);
+ }
+ }
+
public void resumeJob(String jobId) {
AbstractExecutable job = getJob(jobId);
if (job == null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0d7f9e66/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 25b131a..4449d2b 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -20,11 +20,15 @@ package org.apache.kylin.rest.service;
import static org.junit.Assert.*;
+import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
@@ -60,6 +64,8 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
private static Server server;
+ private static String ZK_ADDRESS = "localhost:2199";
+
private static KylinConfig configA;
private static KylinConfig configB;
@@ -70,10 +76,13 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
@BeforeClass
public static void beforeClass() throws Exception {
staticCreateTestMetadata();
+ startZookeeper();
configA = KylinConfig.getInstanceFromEnv();
configA.setProperty("kylin.rest.servers", "localhost:7070");
+ configA.setProperty("kylin.zookeeper.address", ZK_ADDRESS);
configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
configB.setProperty("kylin.rest.servers", "localhost:7070");
+ configB.setProperty("kylin.zookeeper.address", ZK_ADDRESS);
configB.setMetadataUrl("../examples/test_metadata");
server = new Server(7070);
@@ -356,4 +365,20 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
}
return false;
}
+
+
+ public static void startZookeeper() {
+ logger.info("STARTING Zookeeper at " + ZK_ADDRESS);
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ }
+ };
+ new File("/tmp/helix-quickstart").mkdirs();
+ // start zookeeper
+ ZkServer server =
+ new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
+ defaultNameSpace, 2199);
+ server.start();
+ }
}