You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 14:36:43 UTC

kylin git commit: KYLIN-1038 retry on job failure

Repository: kylin
Updated Branches:
  refs/heads/helix-201602 271940046 -> 68a31eb77


KYLIN-1038 retry on job failure

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/68a31eb7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/68a31eb7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/68a31eb7

Branch: refs/heads/helix-201602
Commit: 68a31eb77a6a29131c891d6f049df795e35d1f5e
Parents: 2719400
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 14 21:17:12 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Feb 14 21:17:12 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  3 ++
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++
 .../kylin/job/execution/AbstractExecutable.java | 57 +++++++++++++-------
 .../job/execution/DefaultChainedExecutable.java |  5 ++
 4 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 75269de..57347d7 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -40,6 +40,9 @@ kylin.hbase.cluster.fs=
 
 kylin.job.mapreduce.default.reduce.input.mb=500
 
+# max job retry on error, default 0: no retry
+kylin.job.retry=0
+
 # If true, job engine will not assume that hadoop CLI reside on the same server as it self
 # you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password
 # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine 

http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 87e4566..a4d8292 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -566,6 +566,10 @@ public class KylinConfigBase implements Serializable {
         setProperty("kylin.rest.address", restAddress);
     }
     
+    public int getJobRetry() {
+        return Integer.parseInt(this.getOptional("kylin.job.retry", "0"));
+    }
+    
     public String toString() {
         return getMetadataUrl();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index e1d7106..8d5fea5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -18,13 +18,10 @@
 
 package org.apache.kylin.job.execution;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.common.KylinConfig;
@@ -35,10 +32,12 @@ import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  */
@@ -50,6 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected static final String END_TIME = "endTime";
 
     protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+    protected int retry = 0;
 
     private String name;
     private String id;
@@ -99,15 +99,30 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         logger.info("Executing >>>>>>>>>>>>>   " + this.getName() + "   <<<<<<<<<<<<<");
 
         Preconditions.checkArgument(executableContext instanceof DefaultContext);
-        ExecuteResult result;
-        try {
-            onExecuteStart(executableContext);
-            result = doWork(executableContext);
-        } catch (Throwable e) {
-            logger.error("error running Executable", e);
-            onExecuteError(e, executableContext);
-            throw new ExecuteException(e);
+        ExecuteResult result = null;
+
+        onExecuteStart(executableContext);
+        Throwable exception;
+        do {
+            if (retry > 0) {
+                logger.info("Retry " + retry);
+            }
+            exception = null;
+            result = null;
+            try {
+                result = doWork(executableContext);
+            } catch (Throwable e) {
+                logger.error("error running Executable", e);
+                exception = e;
+            }
+            retry++;
+        } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
+        
+        if (exception != null) {
+            onExecuteError(exception, executableContext);
+            throw new ExecuteException(exception);
         }
+        
         onExecuteFinished(result, executableContext);
         return result;
     }
@@ -301,6 +316,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return status == ExecutableState.DISCARDED;
     }
 
+    protected boolean needRetry() {
+        return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry();
+    }
+
     @Override
     public String toString() {
         return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();

http://git-wip-us.apache.org/repos/asf/kylin/blob/68a31eb7/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2e95711..7403715 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -106,6 +106,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         return subTasks;
     }
 
+    @Override
+    protected boolean needRetry() {
+        return false;
+    }
+
     public final AbstractExecutable getTaskByName(String name) {
         for (AbstractExecutable task : subTasks) {
             if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {