You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/04/02 03:46:14 UTC
[kylin] branch master updated: KYLIN-4826 JobRunner should not do
onExecuteError or onExecuteFinished if it has lost the job lock
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 22322a0 KYLIN-4826 JobRunner should not do onExecuteError or onExecuteFinished if it has lost the job lock
22322a0 is described below
commit 22322a0a28137e7617eb5be1e7d78811aa5a39ef
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Thu Nov 12 11:57:49 2020 +0800
KYLIN-4826 JobRunner should not do onExecuteError or onExecuteFinished if it has lost the job lock
---
.../apache/kylin/job/common/ShellExecutable.java | 3 +-
.../kylin/job/execution/AbstractExecutable.java | 13 ++++++--
.../kylin/job/execution/BrokenExecutable.java | 3 +-
.../job/execution/DefaultChainedExecutable.java | 5 ++--
.../org/apache/kylin/job/execution/Executable.java | 3 +-
.../job/impl/threadpool/DefaultScheduler.java | 9 ++++--
.../job/impl/threadpool/DistributedScheduler.java | 11 +++++--
.../threadpool/IJobRunner.java} | 35 +++++++++-------------
.../org/apache/kylin/job/ErrorTestExecutable.java | 3 +-
.../org/apache/kylin/job/FailedTestExecutable.java | 3 +-
.../kylin/job/FiveSecondSucceedTestExecutable.java | 3 +-
.../kylin/job/PersistExceptionExecutable.java | 3 +-
.../apache/kylin/job/RunningTestExecutable.java | 3 +-
.../org/apache/kylin/job/SelfStopExecutable.java | 3 +-
.../apache/kylin/job/SucceedTestExecutable.java | 3 +-
.../apache/kylin/engine/flink/FlinkExecutable.java | 5 ++--
.../engine/mr/common/HadoopShellExecutable.java | 3 +-
.../engine/mr/common/MapReduceExecutable.java | 3 +-
.../kylin/engine/mr/steps/CopyDictionaryStep.java | 3 +-
.../kylin/engine/mr/steps/MergeDictionaryStep.java | 3 +-
.../kylin/engine/mr/steps/MergeStatisticsStep.java | 3 +-
.../mr/steps/MergeStatisticsWithOldStep.java | 3 +-
.../kylin/engine/mr/steps/SaveStatisticsStep.java | 3 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 3 +-
.../steps/UpdateCubeInfoAfterCheckpointStep.java | 3 +-
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 3 +-
.../mr/steps/UpdateCubeInfoAfterOptimizeStep.java | 3 +-
.../engine/mr/steps/UpdateDictionaryStep.java | 3 +-
.../lookup/LookupSnapshotToMetaStoreStep.java | 3 +-
.../steps/lookup/UpdateCubeAfterSnapshotStep.java | 3 +-
.../kylin/engine/mr/streaming/SaveDictStep.java | 3 +-
.../engine/mr/common/JobInfoConverterTest.java | 5 ++--
.../apache/kylin/engine/spark/SparkExecutable.java | 3 +-
.../kylin/engine/spark/SparkExecutableLivy.java | 3 +-
.../apache/kylin/rest/service/JobServiceTest.java | 3 +-
.../source/hive/CreateFlatHiveTableByLivyStep.java | 3 +-
.../kylin/source/hive/CreateFlatHiveTableStep.java | 3 +-
.../kylin/source/hive/CreateMrHiveDictStep.java | 3 +-
.../kylin/source/hive/GarbageCollectionStep.java | 3 +-
.../hive/RedistributeFlatHiveTableByLivyStep.java | 3 +-
.../source/hive/RedistributeFlatHiveTableStep.java | 3 +-
.../java/org/apache/kylin/source/jdbc/CmdStep.java | 3 +-
.../org/apache/kylin/source/jdbc/HiveCmdStep.java | 3 +-
.../kylin/source/jdbc/sqoop/SqoopCmdStep.java | 3 +-
.../kylin/source/kafka/job/MergeOffsetStep.java | 3 +-
.../UpdateSnapshotCacheForQueryServersStep.java | 3 +-
.../hbase/steps/HDFSPathGarbageCollectionStep.java | 3 +-
.../kylin/storage/hbase/steps/MergeGCStep.java | 3 +-
...UpdateSnapshotCacheForQueryServersStepTest.java | 11 +++----
49 files changed, 136 insertions(+), 81 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index b4abc4c..64b9d5c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -26,6 +26,7 @@ import org.apache.kylin.job.exception.ShellException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.LoggerFactory;
/**
@@ -41,7 +42,7 @@ public class ShellExecutable extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
logger.info("executing:" + getCmd());
final PatternedLogger patternedLogger = new PatternedLogger(logger);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 78fb389..668c707 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.job.util.MailNotificationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,7 +158,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
}
@Override
- public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
+ public final ExecuteResult execute(ExecutableContext executableContext, IJobRunner jobRunner) throws ExecuteException {
logger.info("Executing AbstractExecutable ({})", this.getName());
@@ -176,7 +177,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
catchedException = null;
result = null;
try {
- result = doWork(executableContext);
+ result = doWork(executableContext, jobRunner);
} catch (Throwable e) {
logger.error("error running Executable: {}", this.toString());
catchedException = e;
@@ -188,6 +189,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
//don't invoke retry on ChainedExecutable
} while (needRetry(this.retry, realException)); //exception in ExecuteResult should handle by user itself.
+ // If after doWork finishes and the job lock is lost, it should do short circuit
+ if (!jobRunner.acquireJobLock()) {
+ logger.warn("fail to acquire lock for {} after finishing doWork", id);
+ return ExecuteResult.createSucceed();
+ }
+
//check exception in result to avoid retry on ChainedExecutable(only need to retry on subtask actually)
if (realException != null) {
onExecuteError(realException, executableContext);
@@ -226,7 +233,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
new MailService(context.getConfig()).sendMail(users, title, content);
}
- protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException;
+ protected abstract ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException, PersistentException;
@Override
public void cleanup() throws ExecuteException {
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
index 7bb4fe2..2767f72 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.execution;
import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
* A special Executable used to indicate any executable whose metadata is broken.
@@ -40,7 +41,7 @@ public class BrokenExecutable extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
throw new UnsupportedOperationException();
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 78efb0d..e2b6b76 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLockFactory;
import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
@@ -51,7 +52,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
List<? extends Executable> executables = getTasks();
final int size = executables.size();
for (int i = 0; i < size; ++i) {
@@ -68,7 +69,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
"invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus());
}
if (subTask.isRunnable()) {
- return subTask.execute(context);
+ return subTask.execute(context, jobRunner);
}
}
return new ExecuteResult(ExecuteResult.State.SUCCEED);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java b/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
index e063ec1..e6c974e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job.execution;
import java.util.Map;
import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
*/
@@ -30,7 +31,7 @@ public interface Executable {
String getName();
- ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException;
+ ExecuteResult execute(ExecutableContext executableContext, IJobRunner jobRunner) throws ExecuteException;
ExecutableState getStatus();
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 9c4573e..5c396eb 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -98,7 +98,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable> {
return fetcher;
}
- private class JobRunner implements Runnable {
+ private class JobRunner implements IJobRunner {
private final AbstractExecutable executable;
@@ -107,10 +107,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable> {
}
@Override
+ public boolean acquireJobLock() {
+ return true;
+ }
+
+ @Override
public void run() {
try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
System.identityHashCode(DefaultScheduler.this), executable.getId())) {
- executable.execute(context);
+ executable.execute(context, this);
} catch (ExecuteException e) {
logger.error("ExecuteException job:" + executable.getId(), e);
} catch (Exception e) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 51e7dc0..82db029 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -202,7 +202,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
return this.hasStarted;
}
- private class JobRunner implements Runnable {
+ private class JobRunner implements IJobRunner {
private final AbstractExecutable executable;
@@ -211,6 +211,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
}
@Override
+ public boolean acquireJobLock() {
+ return jobLock.lock(getLockPath(executable.getId()));
+ }
+
+ @Override
public void run() {
try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
System.identityHashCode(DistributedScheduler.this), executable.getId())) {
@@ -225,12 +230,12 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
+ ToolUtil.getHostName());
}
- if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
+ if (isAssigned && acquireJobLock()) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
jobWithLocks.add(executable.getId());
- executable.execute(context);
+ executable.execute(context, this);
}
} catch (ExecuteException e) {
logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/IJobRunner.java
similarity index 65%
copy from core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
copy to core-job/src/main/java/org/apache/kylin/job/impl/threadpool/IJobRunner.java
index e063ec1..5ed607f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/IJobRunner.java
@@ -6,37 +6,30 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-/**
*/
-public interface Executable {
-
- String getId();
-
- String getName();
- ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException;
+package org.apache.kylin.job.impl.threadpool;
- ExecutableState getStatus();
+public interface IJobRunner extends Runnable {
- Output getOutput();
+ boolean acquireJobLock();
- boolean isRunnable();
+ IJobRunner EMPTY_JOB_RUNNER = new IJobRunner() {
+ @Override
+ public void run() {
+ }
- Map<String, String> getParams();
+ @Override
+ public boolean acquireJobLock() {
+ return true;
+ }
+ };
}
diff --git a/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java
index f1b795c..bb85a61 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ErrorTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
*/
@@ -31,7 +32,7 @@ public class ErrorTestExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java
index e18cc0d..18f546b 100644
--- a/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/FailedTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
*/
@@ -31,7 +32,7 @@ public class FailedTestExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
index ee29b13..163a788 100644
--- a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
*/
@@ -35,7 +36,7 @@ public class FiveSecondSucceedTestExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
index 78b393c..e9d62d1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
public class PersistExceptionExecutable extends BaseTestExecutable {
public PersistExceptionExecutable() {
@@ -28,7 +29,7 @@ public class PersistExceptionExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws PersistentException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws PersistentException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
index 1826850..4391a62 100644
--- a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
public class RunningTestExecutable extends BaseTestExecutable {
@@ -29,7 +30,7 @@ public class RunningTestExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index ddeeafa..1fe8d64 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
*/
@@ -33,7 +34,7 @@ public class SelfStopExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
doingWork = true;
try {
for (int i = 0; i < 60; i++) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
index 58ed515..5988f94 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
/**
*/
@@ -31,7 +32,7 @@ public class SucceedTestExecutable extends BaseTestExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 042d720..c908318 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -42,6 +42,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.Segments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +132,7 @@ public class FlinkExecutable extends AbstractExecutable {
@SuppressWarnings("checkstyle:methodlength")
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException, PersistentException {
ExecutableManager manager = getManager();
Map<String, String> extra = manager.getOutput(getId()).getExtra();
String flinkJobId = extra.get(ExecutableConstants.FLINK_JOB_ID);
@@ -214,7 +215,7 @@ public class FlinkExecutable extends AbstractExecutable {
//set job name
sb.append(" -ynm ").append(this.getName().replaceAll(" ", "-")).append(" ");
-
+
if (StringUtils.isNotBlank(jars)) {
String[] splitJars = jars.split(",\\s*");
Set<String> setJars = new HashSet();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
index b19c347..c9015a5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class HadoopShellExecutable extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final String mapReduceJobClass = getJobClass();
String params = getJobParams();
Preconditions.checkNotNull(mapReduceJobClass);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 62fb814..8663054 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -112,7 +113,7 @@ public class MapReduceExecutable extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final String mapReduceJobClass = getMapReduceJobClass();
DistributedLock lock = null;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
index 7aa9712..0a1c393 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,7 @@ public class CopyDictionaryStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager mgr = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index efac94a..034cbc2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -36,6 +36,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager mgr = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 41211a6..57be546 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -47,6 +47,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
@Override
@SuppressWarnings("deprecation")
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager mgr = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index 763ea07..dfe2720 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -41,6 +41,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager mgr = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index c7aefde..99ad594 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -41,6 +41,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +61,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
CubeSegment newSegment = CubingExecutableUtil.findSegment(context,
CubingExecutableUtil.getCubeName(this.getParams()),
CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 9c2a9a4..a4e97a9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -45,6 +45,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.datatype.DataTypeOrder;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.TblColRef;
@@ -67,7 +68,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()))
.latestCopyForWrite();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
index 80811be..f526067 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
@@ -30,6 +30,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 0a8cd1e..d6b05d9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.SegmentRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
index 03aa616..030d3f7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
index 4b684e1..0edb23d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -57,7 +58,7 @@ public class UpdateDictionaryStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(UpdateDictionaryStep.class);
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager cubeMgr = CubeManager.getInstance(context.getConfig());
final DictionaryManager dictMgrHdfs;
final DictionaryManager dictMgrHbase;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
index 753b67c..accf372 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -31,6 +31,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
@@ -50,7 +51,7 @@ public class LookupSnapshotToMetaStoreStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig kylinConfig = context.getConfig();
CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
TableMetadataManager metaMgr = TableMetadataManager.getInstance(kylinConfig);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
index 514c940..c1b9c4d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class UpdateCubeAfterSnapshotStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig kylinConfig = context.getConfig();
CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
index e6cdfda..082b194 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -63,7 +64,7 @@ public class SaveDictStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(SaveDictStep.class);
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
logger.info("job {} start to run SaveDictStep", getJobFlowJobId());
final CubeManager mgr = CubeManager.getInstance(context.getConfig());
final DictionaryManager dictManager = DictionaryManager.getInstance(context.getConfig());
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
index a54e112..861394b 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.engine.mr.CubingJob;
@@ -130,7 +131,7 @@ public class JobInfoConverterTest {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
}
}
@@ -141,7 +142,7 @@ public class JobInfoConverterTest {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
}
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index a74ce5d..d60b432 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -60,6 +60,7 @@ import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
@@ -200,7 +201,7 @@ public class SparkExecutable extends AbstractExecutable {
@SuppressWarnings("checkstyle:methodlength")
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
ExecutableManager mgr = getManager();
Map<String, String> extra = mgr.getOutput(getId()).getExtra();
String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
index d8d62b9..9a776e4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java
@@ -43,6 +43,7 @@ import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.Segments;
import org.apache.parquet.Strings;
import org.slf4j.LoggerFactory;
@@ -150,7 +151,7 @@ public class SparkExecutableLivy extends SparkExecutable {
@SuppressWarnings("checkstyle:methodlength")
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
ExecutableManager mgr = getManager();
Map<String, String> extra = mgr.getOutput(getId()).getExtra();
String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a617b7c..345befb 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.query.QueryConnection;
import org.junit.Assert;
@@ -81,7 +82,7 @@ public class JobServiceTest extends ServiceTestBase {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
}
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
index ca3ba46..ef84b9c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source.hive;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.common.PatternedLogger;
@@ -46,7 +47,7 @@ public class CreateFlatHiveTableByLivyStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
stepLogger.setILogListener((infoKey, info) -> {
// only care two properties here
if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 0840356..532d14c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -35,6 +35,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +84,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
stepLogger.setILogListener((infoKey, info) -> {
// only care two properties here
if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index 8538622..bb43ce4 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -17,6 +17,7 @@
*/
package org.apache.kylin.source.hive;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.base.Strings;
import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
import org.apache.kylin.common.KylinConfig;
@@ -147,7 +148,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
DistributedLock lock = null;
try {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
index 894041b..53e8526 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ public class GarbageCollectionStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = context.getConfig();
StringBuffer output = new StringBuffer();
try {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
index b673092..6e4e579 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java
@@ -18,6 +18,7 @@
package org.apache.kylin.source.hive;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.common.PatternedLogger;
@@ -56,7 +57,7 @@ public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
String intermediateTable = getIntermediateTable();
String database, tableName;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
index 406a2a7..d52c89f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
public class RedistributeFlatHiveTableStep extends AbstractExecutable {
private final PatternedLogger stepLogger = new PatternedLogger(logger);
@@ -65,7 +66,7 @@ public class RedistributeFlatHiveTableStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = getCubeSpecificConfig();
String intermediateTable = getIntermediateTable();
String database, tableName;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
index 287019b..26199c6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
@@ -27,6 +27,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class CmdStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
try {
sqoopFlatHiveTable(config);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
index 073e965..b1fba28 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
@@ -27,6 +27,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class HiveCmdStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
try {
createFlatHiveTable(config);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java
index 626c570..b0e32c8 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/sqoop/SqoopCmdStep.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ public class SqoopCmdStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
try {
sqoopFlatHiveTable(config);
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 9bb0520..9dc9a0d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -29,6 +29,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.Segments;
@@ -48,7 +49,7 @@ public class MergeOffsetStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cubeCopy = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
final String segmentId = CubingExecutableUtil.getSegmentId(this.getParams());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
index 196ed5f..6d9c173 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
@@ -23,6 +23,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.RestClient;
@@ -40,7 +41,7 @@ public class UpdateSnapshotCacheForQueryServersStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(UpdateSnapshotCacheForQueryServersStep.class);
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
final String tableName = LookupExecutableUtil.getLookupTableName(this.getParams());
final String snapshotID = LookupExecutableUtil.getLookupSnapshotID(this.getParams());
final String projectName = LookupExecutableUtil.getProjectName(this.getParams());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 2eb0f3a..553d39f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
config = new JobEngineConfig(context.getConfig());
List<String> toDeletePaths = getDeletePaths();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index 85400ab..fd435d2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class MergeGCStep extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ protected ExecuteResult doWork(ExecutableContext context, IJobRunner jobRunner) throws ExecuteException {
try {
logger.info("Sleep one minute before deleting the Htables");
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
index e7a63fa..f403b11 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
@@ -14,26 +14,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.storage.hbase.lookup;
-import static org.junit.Assert.assertTrue;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.apache.kylin.job.impl.threadpool.IJobRunner;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import static org.junit.Assert.assertTrue;
public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadataTestCase {
private KylinConfig kylinConfig;
+
@Before
public void setup() throws Exception {
this.createTestMetadata();
@@ -48,7 +49,7 @@ public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadat
@Test
public void testExecute() throws ExecuteException {
UpdateSnapshotCacheForQueryServersStep step = new UpdateSnapshotCacheForQueryServersStep();
- ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig));
+ ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig), IJobRunner.EMPTY_JOB_RUNNER);
System.out.println(result.output());
assertTrue(result.succeed());
}