You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/21 15:12:45 UTC
kylin git commit: KYLIN-2913 Enable job retry for configurable
exceptions
Repository: kylin
Updated Branches:
refs/heads/master 7e213b013 -> 006a5aab0
KYLIN-2913 Enable job retry for configurable exceptions
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/006a5aab
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/006a5aab
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/006a5aab
Branch: refs/heads/master
Commit: 006a5aab091bddf47d5a1cff19081b2245cd2a31
Parents: 7e213b0
Author: gwang3 <gw...@ebay.com>
Authored: Mon Dec 18 10:03:49 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Dec 21 23:12:18 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 ++
.../kylin/job/execution/AbstractExecutable.java | 36 ++++++++++++--
.../job/execution/DefaultChainedExecutable.java | 5 --
.../kylin/job/execution/ExecuteResult.java | 7 +++
.../kylin/job/RetryableTestExecutable.java | 50 ++++++++++++++++++++
.../apache/kylin/job/SelfStopExecutable.java | 2 +-
.../apache/kylin/job/SucceedTestExecutable.java | 2 +-
.../impl/threadpool/DefaultSchedulerTest.java | 17 +++++++
.../engine/mr/steps/CopyDictionaryStep.java | 2 +-
.../mr/steps/MergeStatisticsWithOldStep.java | 2 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +-
.../UpdateCubeInfoAfterCheckpointStep.java | 2 +-
.../steps/UpdateCubeInfoAfterOptimizeStep.java | 2 +-
.../apache/kylin/job/ContextTestExecutable.java | 2 +-
.../kylin/source/kafka/job/MergeOffsetStep.java | 2 +-
.../source/kafka/job/UpdateTimeRangeStep.java | 2 +-
16 files changed, 120 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ce524b1..7763457 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -557,6 +557,10 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(this.getOptional("kylin.job.retry", "0"));
}
+ public String[] getJobRetryExceptions() {
+ return getOptionalStringArray("kylin.job.retry-exception-classes", new String[0]);
+ }
+
public int getCubeStatsHLLPrecision() {
return Integer.parseInt(getOptional("kylin.job.sampling-hll-precision", "14"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index a37cdc9..6a0db97 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.MailService;
@@ -114,6 +115,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
Preconditions.checkArgument(executableContext instanceof DefaultContext);
ExecuteResult result = null;
+
try {
onExecuteStart(executableContext);
Throwable exception;
@@ -130,9 +132,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
exception = e;
}
retry++;
- } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
+ } while (needRetry(result, exception));
- if (exception != null) {
+ //check exception in result to avoid retry on ChainedExecutable(only need retry on subtask actually)
+ if (exception != null || result.getThrowable() != null) {
onExecuteError(exception, executableContext);
throw new ExecuteException(exception);
}
@@ -172,6 +175,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return false;
}
+ private boolean isRetryableExecutionResult(ExecuteResult result) {
+ if (result != null && result.getThrowable() != null && isRetrableException(result.getThrowable())) {
+ return true;
+ }
+ return false;
+ }
+
protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException;
@Override
@@ -412,8 +422,26 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return status == ExecutableState.STOPPED;
}
- protected boolean needRetry() {
- return this.retry <= config.getJobRetry();
+ protected boolean isRetrableException(Throwable t) {
+ return ArrayUtils.contains(KylinConfig.getInstanceFromEnv().getJobRetryExceptions(), t.getClass().getName());
+ }
+
+ // Retry will happen in below cases:
+ // 1) if property "kylin.job.retry-exception-classes" is not set or is null, all jobs with exceptions will retry according to the retry times.
+ // 2) if property "kylin.job.retry-exception-classes" is set and is not null, only jobs with the specified exceptions will retry according to the retry times.
+ protected boolean needRetry(ExecuteResult result, Throwable e) {
+ if (this.retry > KylinConfig.getInstanceFromEnv().getJobRetry()) {
+ return false;
+ }
+ String[] retryableEx = KylinConfig.getInstanceFromEnv().getJobRetryExceptions();
+ if (retryableEx == null || retryableEx.length == 0) {
+ return true;
+ }
+ if ((result != null && isRetryableExecutionResult(result))
+ || e != null && isRetrableException(e)) {
+ return true;
+ }
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index ff8dfee..9e53459 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -150,11 +150,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
return subTasks;
}
- @Override
- protected boolean needRetry() {
- return false;
- }
-
public final AbstractExecutable getTaskByName(String name) {
for (AbstractExecutable task : subTasks) {
if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index c0c9d36..fa24bb0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -32,6 +32,13 @@ public final class ExecuteResult {
private final String output;
private final Throwable throwable;
+ /**
+ * Default constructor to indicate a success ExecuteResult.
+ */
+ public ExecuteResult() {
+ this(State.SUCCEED, "succeed");
+ }
+
public ExecuteResult(State state) {
this(state, "");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
new file mode 100644
index 0000000..61b1742
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class RetryableTestExecutable extends BaseTestExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(RetryableTestExecutable.class);
+
+ public RetryableTestExecutable() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) {
+ logger.debug("run retryable exception test. ");
+ String[] exceptions = KylinConfig.getInstanceFromEnv().getJobRetryExceptions();
+ Throwable ex = null;
+ if (exceptions != null && exceptions[0] != null) {
+ try {
+ ex = (Throwable) Class.forName(exceptions[0]).newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return new ExecuteResult(ExecuteResult.State.ERROR, null, ex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index 9a3eb48..09f5a7a 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -43,7 +43,7 @@ public class SelfStopExecutable extends BaseTestExecutable {
return new ExecuteResult(ExecuteResult.State.STOPPED, "stopped");
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} finally {
doingWork = false;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
index 1421f10..58ed515 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java
@@ -36,6 +36,6 @@ public class SucceedTestExecutable extends BaseTestExecutable {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index c8b251d..badd483 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kylin.job.BaseTestExecutable;
import org.apache.kylin.job.ErrorTestExecutable;
import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.RetryableTestExecutable;
import org.apache.kylin.job.SelfStopExecutable;
import org.apache.kylin.job.SucceedTestExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -145,4 +146,20 @@ public class DefaultSchedulerTest extends BaseSchedulerTest {
assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(7, TimeUnit.SECONDS));
assertFalse("future2 should has been stopped", future2.cancel(true));
}
+
+ @Test
+ public void testRetryableException() throws Exception {
+ System.setProperty("kylin.job.retry-exception-classes", "java.io.FileNotFoundException");
+ System.setProperty("kylin.job.retry", "3");
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new RetryableTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
index 3341be9..c0b3c99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -65,6 +65,6 @@ public class CopyDictionaryStep extends AbstractExecutable {
return ExecuteResult.createError(e);
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index eca0499..3f12b0d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -134,7 +134,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, optimizeSegment);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} catch (IOException e) {
logger.error("fail to merge cuboid statistics", e);
return ExecuteResult.createError(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 7d36643..beb9357 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -73,7 +73,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
}
cubeManager.promoteNewlyBuiltSegments(cube, segment);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} catch (IOException e) {
logger.error("fail to update cube after build", e);
return ExecuteResult.createError(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
index ed61b4a..80811be 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
@@ -59,7 +59,7 @@ public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable {
}
cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats,
newSegments.toArray(new CubeSegment[newSegments.size()]));
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} catch (Exception e) {
logger.error("fail to update cube after build", e);
return ExecuteResult.createError(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
index 13c4f40..d013386 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -62,7 +62,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
try {
cubeManager.promoteNewlyOptimizeSegments(cube, segment);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} catch (IOException e) {
logger.error("fail to update cube after build", e);
return ExecuteResult.createError(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
index 4696e67..9b4f299 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
@@ -36,7 +36,7 @@ public class ContextTestExecutable extends AbstractExecutable {
} catch (InterruptedException e) {
}
if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} else {
return new ExecuteResult(ExecuteResult.State.ERROR, "error");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 8139342..fe5812b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -73,7 +73,7 @@ public class MergeOffsetStep extends AbstractExecutable {
cubeBuilder.setToUpdateSegs(segment);
try {
cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
} catch (IOException e) {
logger.error("fail to update cube segment offset", e);
return ExecuteResult.createError(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
index 8c31c70..183271d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
@@ -37,7 +37,7 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult();
}
}