You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/04/02 03:46:14 UTC

[kylin] branch master updated: KYLIN-4826 JobRunner should not do onExecuteError or onExecuteFinished if it has lost the job lock

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 22322a0  KYLIN-4826 JobRunner should not do onExecuteError or onExecuteFinished if it has lost the job lock
22322a0 is described below

commit 22322a0a28137e7617eb5be1e7d78811aa5a39ef
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Thu Nov 12 11:57:49 2020 +0800

    KYLIN-4826 JobRunner should not do onExecuteError or onExecuteFinished if it has lost the job lock
---
 .../apache/kylin/job/common/ShellExecutable.java   |  3 +-
 .../kylin/job/execution/AbstractExecutable.java    | 13 ++++++--
 .../kylin/job/execution/BrokenExecutable.java      |  3 +-
 .../job/execution/DefaultChainedExecutable.java    |  5 ++--
 .../org/apache/kylin/job/execution/Executable.java |  3 +-
 .../job/impl/threadpool/DefaultScheduler.java      |  9 ++++--
 .../job/impl/threadpool/DistributedScheduler.java  | 11 +++++--
 .../threadpool/IJobRunner.java}                    | 35 +++++++++-------------
 .../org/apache/kylin/job/ErrorTestExecutable.java  |  3 +-
 .../org/apache/kylin/job/FailedTestExecutable.java |  3 +-
 .../kylin/job/FiveSecondSucceedTestExecutable.java |  3 +-
 .../kylin/job/PersistExceptionExecutable.java      |  3 +-
 .../apache/kylin/job/RunningTestExecutable.java    |  3 +-
 .../org/apache/kylin/job/SelfStopExecutable.java   |  3 +-
 .../apache/kylin/job/SucceedTestExecutable.java    |  3 +-
 .../apache/kylin/engine/flink/FlinkExecutable.java |  5 ++--
 .../engine/mr/common/HadoopShellExecutable.java    |  3 +-
 .../engine/mr/common/MapReduceExecutable.java      |  3 +-
 .../kylin/engine/mr/steps/CopyDictionaryStep.java  |  3 +-
 .../kylin/engine/mr/steps/MergeDictionaryStep.java |  3 +-
 .../kylin/engine/mr/steps/MergeStatisticsStep.java |  3 +-
 .../mr/steps/MergeStatisticsWithOldStep.java       |  3 +-
 .../kylin/engine/mr/steps/SaveStatisticsStep.java  |  3 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  3 +-
 .../steps/UpdateCubeInfoAfterCheckpointStep.java   |  3 +-
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java     |  3 +-
 .../mr/steps/UpdateCubeInfoAfterOptimizeStep.java  |  3 +-
 .../engine/mr/steps/UpdateDictionaryStep.java      |  3 +-
 .../lookup/LookupSnapshotToMetaStoreStep.java      |  3 +-
 .../steps/lookup/UpdateCubeAfterSnapshotStep.java  |  3 +-
 .../kylin/engine/mr/streaming/SaveDictStep.java    |  3 +-
 .../engine/mr/common/JobInfoConverterTest.java     |  5 ++--
 .../apache/kylin/engine/spark/SparkExecutable.java |  3 +-
 .../kylin/engine/spark/SparkExecutableLivy.java    |  3 +-
 .../apache/kylin/rest/service/JobServiceTest.java  |  3 +-
 .../source/hive/CreateFlatHiveTableByLivyStep.java |  3 +-
 .../kylin/source/hive/CreateFlatHiveTableStep.java |  3 +-
 .../kylin/source/hive/CreateMrHiveDictStep.java    |  3 +-
 .../kylin/source/hive/GarbageCollectionStep.java   |  3 +-
 .../hive/RedistributeFlatHiveTableByLivyStep.java  |  3 +-
 .../source/hive/RedistributeFlatHiveTableStep.java |  3 +-
 .../java/org/apache/kylin/source/jdbc/CmdStep.java |  3 +-
 .../org/apache/kylin/source/jdbc/HiveCmdStep.java  |  3 +-
 .../kylin/source/jdbc/sqoop/SqoopCmdStep.java      |  3 +-
 .../kylin/source/kafka/job/MergeOffsetStep.java    |  3 +-
 .../UpdateSnapshotCacheForQueryServersStep.java    |  3 +-
 .../hbase/steps/HDFSPathGarbageCollectionStep.java |  3 +-
 .../kylin/storage/hbase/steps/MergeGCStep.java     |  3 +-
 ...UpdateSnapshotCacheForQueryServersStepTest.java | 11 +++----
 49 files changed, 136 insertions(+), 81 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index b4abc4c..64b9d5c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -26,6 +26,7 @@ import org.apache.kylin.job.exception.ShellException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -41,7 +42,7 @@ public class ShellExecutable extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             logger.info("executing:" + getCmd());
             final PatternedLogger patternedLogger = new PatternedLogger(logger);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 78fb389..668c707 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.job.util.MailNotificationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,7 +158,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     }
 
     @Override
-    public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
+    public final ExecuteResult execute(ExecutableContext executableContext, IJobRunner jobRunner) throws ExecuteException {
 
         logger.info("Executing AbstractExecutable ({})", this.getName());
 
@@ -176,7 +177,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 catchedException = null;
                 result = null;
                 try {
-                    result = doWork(executableContext);
+                    result = doWork(executableContext, jobRunner);
                 } catch (Throwable e) {
                     logger.error("error running Executable: {}", this.toString());
                     catchedException = e;
@@ -188,6 +189,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 //don't invoke retry on ChainedExecutable
             } while (needRetry(this.retry, realException)); //exception in ExecuteResult should handle by user itself.
 
+            // If after doWork finishes and the job lock is lost, it should do short circuit
+            if (!jobRunner.acquireJobLock()) {
+                logger.warn("fail to acquire lock for {} after finishing doWork", id);
+                return ExecuteResult.createSucceed();
+            }
+
             //check exception in result to avoid retry on ChainedExecutable(only need to retry on subtask actually)
             if (realException != null) {
                 onExecuteError(realException, executableContext);
@@ -226,7 +233,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         new MailService(context.getConfig()).sendMail(users, title, content);
     }
 
-    protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException;
+    protected abstract ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException, PersistentException;
 
     @Override
     public void cleanup() throws ExecuteException {
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
index 7bb4fe2..2767f72 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.execution;
 
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  * A special Executable used to indicate any executable whose metadata is broken.
@@ -40,7 +41,7 @@ public class BrokenExecutable extends AbstractExecutable {
     }
     
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         throw new UnsupportedOperationException();
     }
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 78efb0d..e2b6b76 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLockFactory;
 import org.apache.kylin.job.exception.ExecuteException;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
@@ -51,7 +52,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         List<? extends Executable> executables = getTasks();
         final int size = executables.size();
         for (int i = 0; i < size; ++i) {
@@ -68,7 +69,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                         "invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus());
             }
             if (subTask.isRunnable()) {
-                return subTask.execute(context);
+                return subTask.execute(context, jobRunner);
             }
         }
         return new ExecuteResult(ExecuteResult.State.SUCCEED);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java b/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
index e063ec1..e6c974e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job.execution;
 import java.util.Map;
 
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  */
@@ -30,7 +31,7 @@ public interface Executable {
 
     String getName();
 
-    ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException;
+    ExecuteResult execute(ExecutableContext executableContext, IJobRunner jobRunner) throws ExecuteException;
 
     ExecutableState getStatus();
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 9c4573e..5c396eb 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -98,7 +98,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable> {
         return fetcher;
     }
 
-    private class JobRunner implements Runnable {
+    private class JobRunner implements IJobRunner {
 
         private final AbstractExecutable executable;
 
@@ -107,10 +107,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable> {
         }
 
         @Override
+        public boolean acquireJobLock() {
+            return true;
+        }
+
+        @Override
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
                     System.identityHashCode(DefaultScheduler.this), executable.getId())) {
-                executable.execute(context);
+                executable.execute(context, this);
             } catch (ExecuteException e) {
                 logger.error("ExecuteException job:" + executable.getId(), e);
             } catch (Exception e) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 51e7dc0..82db029 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -202,7 +202,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
         return this.hasStarted;
     }
 
-    private class JobRunner implements Runnable {
+    private class JobRunner implements IJobRunner {
 
         private final AbstractExecutable executable;
 
@@ -211,6 +211,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
         }
 
         @Override
+        public boolean acquireJobLock() {
+            return jobLock.lock(getLockPath(executable.getId()));
+        }
+
+        @Override
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
                     System.identityHashCode(DistributedScheduler.this), executable.getId())) {
@@ -225,12 +230,12 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
                             + ToolUtil.getHostName());
                 }
 
-                if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
+                if (isAssigned && acquireJobLock()) {
                     logger.info(executable.toString() + " scheduled in server: " + serverName);
 
                     context.addRunningJob(executable);
                     jobWithLocks.add(executable.getId());
-                    executable.execute(context);
+                    executable.execute(context, this);
                 }
             } catch (ExecuteException e) {
                 logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/IJobRunner.java
similarity index 65%
copy from core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
copy to core-job/src/main/java/org/apache/kylin/job/impl/threadpool/IJobRunner.java
index e063ec1..5ed607f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/IJobRunner.java
@@ -6,37 +6,30 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-/**
  */
-public interface Executable {
-
-    String getId();
-
-    String getName();
 
-    ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException;
+package org.apache.kylin.job.impl.threadpool;
 
-    ExecutableState getStatus();
+public interface IJobRunner extends Runnable {
 
-    Output getOutput();
+    boolean acquireJobLock();
 
-    boolean isRunnable();
+    IJobRunner EMPTY_JOB_RUNNER = new IJobRunner() {
+        @Override
+        public void run() {
+        }
 
-    Map<String, String> getParams();
+        @Override
+        public boolean acquireJobLock() {
+            return true;
+        }
+    };
 }
diff --git a/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java
index f1b795c..bb85a61 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  */
@@ -31,7 +32,7 @@ public class ErrorTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             Thread.sleep(100);
         } catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java
index e18cc0d..18f546b 100644
--- a/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  */
@@ -31,7 +32,7 @@ public class FailedTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
index ee29b13..163a788 100644
--- a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  */
@@ -35,7 +36,7 @@ public class FiveSecondSucceedTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             Thread.sleep(5000);
         } catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
index 78b393c..e9d62d1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 public class PersistExceptionExecutable extends BaseTestExecutable {
     public PersistExceptionExecutable() {
@@ -28,7 +29,7 @@ public class PersistExceptionExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws PersistentException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws PersistentException {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
index 1826850..4391a62 100644
--- a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 public class RunningTestExecutable extends BaseTestExecutable {
 
@@ -29,7 +30,7 @@ public class RunningTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index ddeeafa..1fe8d64 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  */
@@ -33,7 +34,7 @@ public class SelfStopExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         doingWork = true;
         try {
             for (int i = 0; i < 60; i++) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
index 58ed515..5988f94 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 /**
  */
@@ -31,7 +32,7 @@ public class SucceedTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 042d720..c908318 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -42,6 +42,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.Segments;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,7 +132,7 @@ public class FlinkExecutable extends AbstractExecutable {
 
     @SuppressWarnings("checkstyle:methodlength")
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException, PersistentException {
         ExecutableManager manager = getManager();
         Map<String, String> extra = manager.getOutput(getId()).getExtra();
         String flinkJobId = extra.get(ExecutableConstants.FLINK_JOB_ID);
@@ -214,7 +215,7 @@ public class FlinkExecutable extends AbstractExecutable {
 
             //set job name
             sb.append(" -ynm ").append(this.getName().replaceAll(" ", "-")).append(" ");
-            
+
             if (StringUtils.isNotBlank(jars)) {
                 String[] splitJars = jars.split(",\\s*");
                 Set<String> setJars = new HashSet();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
index b19c347..c9015a5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +48,7 @@ public class HadoopShellExecutable extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final String mapReduceJobClass = getJobClass();
         String params = getJobParams();
         Preconditions.checkNotNull(mapReduceJobClass);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 62fb814..8663054 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -112,7 +113,7 @@ public class MapReduceExecutable extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final String mapReduceJobClass = getMapReduceJobClass();
         DistributedLock lock = null;
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
index 7aa9712..0a1c393 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +43,7 @@ public class CopyDictionaryStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
         final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index efac94a..034cbc2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -36,6 +36,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 41211a6..57be546 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -47,6 +47,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
 
     @Override
     @SuppressWarnings("deprecation")
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index 763ea07..dfe2720 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -41,6 +41,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index c7aefde..99ad594 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -41,6 +41,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +61,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         CubeSegment newSegment = CubingExecutableUtil.findSegment(context,
                 CubingExecutableUtil.getCubeName(this.getParams()),
                 CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 9c2a9a4..a4e97a9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -45,6 +45,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -67,7 +68,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()))
                 .latestCopyForWrite();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
index 80811be..f526067 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
@@ -30,6 +30,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 0a8cd1e..d6b05d9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
index 03aa616..030d3f7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +43,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
index 4b684e1..0edb23d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -57,7 +58,7 @@ public class UpdateDictionaryStep extends AbstractExecutable {
     private static final Logger logger = LoggerFactory.getLogger(UpdateDictionaryStep.class);
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager cubeMgr = CubeManager.getInstance(context.getConfig());
         final DictionaryManager dictMgrHdfs;
         final DictionaryManager dictMgrHbase;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
index 753b67c..accf372 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -31,6 +31,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
@@ -50,7 +51,7 @@ public class LookupSnapshotToMetaStoreStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig kylinConfig = context.getConfig();
         CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
         TableMetadataManager metaMgr = TableMetadataManager.getInstance(kylinConfig);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
index 514c940..c1b9c4d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +48,7 @@ public class UpdateCubeAfterSnapshotStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig kylinConfig = context.getConfig();
         CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
index e6cdfda..082b194 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -63,7 +64,7 @@ public class SaveDictStep extends AbstractExecutable {
     private static final Logger logger = LoggerFactory.getLogger(SaveDictStep.class);
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         logger.info("job {} start to run SaveDictStep", getJobFlowJobId());
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
         final DictionaryManager dictManager = DictionaryManager.getInstance(context.getConfig());
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
index a54e112..861394b 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
@@ -130,7 +131,7 @@ public class JobInfoConverterTest {
         }
 
         @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
         }
     }
@@ -141,7 +142,7 @@ public class JobInfoConverterTest {
         }
 
         @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
         }
     }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index a74ce5d..d60b432 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -60,6 +60,7 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -200,7 +201,7 @@ public class SparkExecutable extends AbstractExecutable {
 
     @SuppressWarnings("checkstyle:methodlength")
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         ExecutableManager mgr = getManager();
         Map<String, String> extra = mgr.getOutput(getId()).getExtra();
         String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
index d8d62b9..9a776e4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
@@ -43,6 +43,7 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.parquet.Strings;
 import org.slf4j.LoggerFactory;
@@ -150,7 +151,7 @@ public class SparkExecutableLivy extends SparkExecutable {
 
     @SuppressWarnings("checkstyle:methodlength")
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         ExecutableManager mgr = getManager();
         Map<String, String> extra = mgr.getOutput(getId()).getExtra();
         String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a617b7c..345befb 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.QueryConnection;
 import org.junit.Assert;
@@ -81,7 +82,7 @@ public class JobServiceTest extends ServiceTestBase {
         }
 
         @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
         }
     }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
index ca3ba46..ef84b9c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source.hive;
 
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.common.PatternedLogger;
@@ -46,7 +47,7 @@ public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         stepLogger.setILogListener((infoKey, info) -> {
                     // only care two properties here
                     if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 0840356..532d14c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -35,6 +35,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +84,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         stepLogger.setILogListener((infoKey, info) -> {
                     // only care two properties here
                     if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index 8538622..bb43ce4 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -17,6 +17,7 @@
  */
 package org.apache.kylin.source.hive;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
@@ -147,7 +148,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         DistributedLock lock = null;
         try {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
index 894041b..53e8526 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +42,7 @@ public class GarbageCollectionStep extends AbstractExecutable {
     private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = context.getConfig();
         StringBuffer output = new StringBuffer();
         try {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
index b673092..6e4e579 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source.hive;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.common.PatternedLogger;
@@ -56,7 +57,7 @@ public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         String intermediateTable = getIntermediateTable();
         String database, tableName;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
index 406a2a7..d52c89f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 
 public class RedistributeFlatHiveTableStep extends AbstractExecutable {
     private final PatternedLogger stepLogger = new PatternedLogger(logger);
@@ -65,7 +66,7 @@ public class RedistributeFlatHiveTableStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         String intermediateTable = getIntermediateTable();
         String database, tableName;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
index 287019b..26199c6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
@@ -27,6 +27,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,7 @@ public class CmdStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         try {
             sqoopFlatHiveTable(config);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
index 073e965..b1fba28 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
@@ -27,6 +27,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class HiveCmdStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         try {
             createFlatHiveTable(config);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java
index 626c570..b0e32c8 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +65,7 @@ public class SqoopCmdStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         try {
             sqoopFlatHiveTable(config);
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 9bb0520..9dc9a0d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.Segments;
@@ -48,7 +49,7 @@ public class MergeOffsetStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
         final CubeInstance cubeCopy = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
         final String segmentId = CubingExecutableUtil.getSegmentId(this.getParams());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
index 196ed5f..6d9c173 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
@@ -23,6 +23,7 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.List;
 
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.RestClient;
@@ -40,7 +41,7 @@ public class UpdateSnapshotCacheForQueryServersStep extends AbstractExecutable {
     private static final Logger logger = LoggerFactory.getLogger(UpdateSnapshotCacheForQueryServersStep.class);
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         final String tableName = LookupExecutableUtil.getLookupTableName(this.getParams());
         final String snapshotID = LookupExecutableUtil.getLookupSnapshotID(this.getParams());
         final String projectName = LookupExecutableUtil.getProjectName(this.getParams());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 2eb0f3a..553d39f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
         try {
             config = new JobEngineConfig(context.getConfig());
             List<String> toDeletePaths = getDeletePaths();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index 85400ab..fd435d2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class MergeGCStep extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
 
         try {
             logger.info("Sleep one minute before deleting the Htables");
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
index e7a63fa..f403b11 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
@@ -14,26 +14,27 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.storage.hbase.lookup;
 
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import static org.junit.Assert.assertTrue;
 
 public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadataTestCase {
     private KylinConfig kylinConfig;
+
     @Before
     public void setup() throws Exception {
         this.createTestMetadata();
@@ -48,7 +49,7 @@ public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadat
     @Test
     public void testExecute() throws ExecuteException {
         UpdateSnapshotCacheForQueryServersStep step = new UpdateSnapshotCacheForQueryServersStep();
-        ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig));
+        ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig), IJobRunner.EMPTY_JOB_RUNNER);
         System.out.println(result.output());
         assertTrue(result.succeed());
     }