You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:03:06 UTC

[25/43] kylin git commit: KYLIN-1189 resume running jobs when job engine failover

KYLIN-1189 resume running jobs when job engine failover


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0d7f9e66
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0d7f9e66
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0d7f9e66

Branch: refs/heads/helix-rebase
Commit: 0d7f9e666903a3f226eb877240e87e4942d8a9fd
Parents: 3b5260a
Author: shaofengshi <sh...@apache.org>
Authored: Thu Dec 3 10:32:40 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Mar 4 09:52:19 2016 +0800

----------------------------------------------------------------------
 .../job/impl/threadpool/DefaultScheduler.java   |  7 +-----
 .../kylin/job/manager/ExecutableManager.java    | 16 +++++++++++++
 .../kylin/rest/service/CacheServiceTest.java    | 25 ++++++++++++++++++++
 3 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0d7f9e66/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 417e279..2915c60 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -170,12 +170,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
         jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
         context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
 
-        for (AbstractExecutable executable : executableManager.getAllExecutables()) {
-            if (executable.getStatus() == ExecutableState.READY) {
-                executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
-            }
-        }
-        executableManager.updateAllRunningJobsToError();
+        executableManager.resumeAllRunningJobs();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0d7f9e66/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index 3effbe7..4d03389 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -207,6 +207,7 @@ public class ExecutableManager {
         }
     }
 
+    @Deprecated
     public void updateAllRunningJobsToError() {
         try {
             final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
@@ -222,6 +223,21 @@ public class ExecutableManager {
         }
     }
 
+    public void resumeAllRunningJobs() {
+        try {
+            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
+                    executableOutputPO.setStatus(ExecutableState.READY.toString());
+                    executableDao.updateJobOutput(executableOutputPO);
+                }
+            }
+        } catch (PersistentException e) {
+            logger.error("error reset job status from RUNNING to READY", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     public void resumeJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
         if (job == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0d7f9e66/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 25b131a..4449d2b 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -20,11 +20,15 @@ package org.apache.kylin.rest.service;
 
 import static org.junit.Assert.*;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
@@ -60,6 +64,8 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
 
     private static Server server;
 
+    private static String ZK_ADDRESS = "localhost:2199";
+    
     private static KylinConfig configA;
     private static KylinConfig configB;
 
@@ -70,10 +76,13 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
     @BeforeClass
     public static void beforeClass() throws Exception {
         staticCreateTestMetadata();
+        startZookeeper();
         configA = KylinConfig.getInstanceFromEnv();
         configA.setProperty("kylin.rest.servers", "localhost:7070");
+        configA.setProperty("kylin.zookeeper.address", ZK_ADDRESS);
         configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
         configB.setProperty("kylin.rest.servers", "localhost:7070");
+        configB.setProperty("kylin.zookeeper.address", ZK_ADDRESS);
         configB.setMetadataUrl("../examples/test_metadata");
 
         server = new Server(7070);
@@ -356,4 +365,20 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         }
         return false;
     }
+
+
+    public static void startZookeeper() {
+        logger.info("STARTING Zookeeper at " + ZK_ADDRESS);
+        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+            }
+        };
+        new File("/tmp/helix-quickstart").mkdirs();
+        // start zookeeper
+        ZkServer server =
+                new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
+                        defaultNameSpace, 2199);
+        server.start();
+    }
 }