You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 14:36:43 UTC
kylin git commit: KYLIN-1038 retry on job failure
Repository: kylin
Updated Branches:
refs/heads/helix-201602 271940046 -> 68a31eb77
KYLIN-1038 retry on job failure
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/68a31eb7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/68a31eb7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/68a31eb7
Branch: refs/heads/helix-201602
Commit: 68a31eb77a6a29131c891d6f049df795e35d1f5e
Parents: 2719400
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 14 21:17:12 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Feb 14 21:17:12 2016 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 3 ++
.../apache/kylin/common/KylinConfigBase.java | 4 ++
.../kylin/job/execution/AbstractExecutable.java | 57 +++++++++++++-------
.../job/execution/DefaultChainedExecutable.java | 5 ++
4 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 75269de..57347d7 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -40,6 +40,9 @@ kylin.hbase.cluster.fs=
kylin.job.mapreduce.default.reduce.input.mb=500
+# max job retry on error, default 0: no retry
+kylin.job.retry=0
+
# If true, job engine will not assume that hadoop CLI reside on the same server as it self
# you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password
# It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine
http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 87e4566..a4d8292 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -566,6 +566,10 @@ public class KylinConfigBase implements Serializable {
setProperty("kylin.rest.address", restAddress);
}
+ public int getJobRetry() {
+ return Integer.parseInt(this.getOptional("kylin.job.retry", "0"));
+ }
+
public String toString() {
return getMetadataUrl();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index e1d7106..8d5fea5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -18,13 +18,10 @@
package org.apache.kylin.job.execution;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kylin.common.KylinConfig;
@@ -35,10 +32,12 @@ import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
/**
*/
@@ -50,6 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected static final String END_TIME = "endTime";
protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+ protected int retry = 0;
private String name;
private String id;
@@ -99,15 +99,30 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
logger.info("Executing >>>>>>>>>>>>> " + this.getName() + " <<<<<<<<<<<<<");
Preconditions.checkArgument(executableContext instanceof DefaultContext);
- ExecuteResult result;
- try {
- onExecuteStart(executableContext);
- result = doWork(executableContext);
- } catch (Throwable e) {
- logger.error("error running Executable", e);
- onExecuteError(e, executableContext);
- throw new ExecuteException(e);
+ ExecuteResult result = null;
+
+ onExecuteStart(executableContext);
+ Throwable exception;
+ do {
+ if (retry > 0) {
+ logger.info("Retry " + retry);
+ }
+ exception = null;
+ result = null;
+ try {
+ result = doWork(executableContext);
+ } catch (Throwable e) {
+ logger.error("error running Executable", e);
+ exception = e;
+ }
+ retry++;
+ } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
+
+ if (exception != null) {
+ onExecuteError(exception, executableContext);
+ throw new ExecuteException(exception);
}
+
onExecuteFinished(result, executableContext);
return result;
}
@@ -301,6 +316,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return status == ExecutableState.DISCARDED;
}
+ protected boolean needRetry() {
+ return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry();
+ }
+
@Override
public String toString() {
return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();
http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2e95711..7403715 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -106,6 +106,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
return subTasks;
}
+ @Override
+ protected boolean needRetry() {
+ return false;
+ }
+
public final AbstractExecutable getTaskByName(String name) {
for (AbstractExecutable task : subTasks) {
if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {