You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ja...@apache.org on 2016/08/02 18:06:56 UTC
oozie git commit: OOZIE-2440 Exponential re-try policy for workflow
action (satishsaley via jaydeepvishwakarma)
Repository: oozie
Updated Branches:
refs/heads/master 273003062 -> 99a3df568
OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/99a3df56
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/99a3df56
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/99a3df56
Branch: refs/heads/master
Commit: 99a3df568d182e75751126aac5fe8b63d876e64f
Parents: 2730030
Author: jvishwakarma <jv...@walmartlabs.com>
Authored: Tue Aug 2 23:36:18 2016 +0530
Committer: jvishwakarma <jv...@walmartlabs.com>
Committed: Tue Aug 2 23:36:18 2016 +0530
----------------------------------------------------------------------
.../src/main/resources/oozie-workflow-0.5.xsd | 1 +
.../oozie/command/wf/ActionCheckXCommand.java | 2 +-
.../oozie/command/wf/ActionEndXCommand.java | 2 +-
.../apache/oozie/command/wf/ActionXCommand.java | 52 ++++-
.../oozie/service/LiteWorkflowStoreService.java | 13 +-
.../oozie/workflow/lite/ActionNodeDef.java | 5 +-
.../workflow/lite/LiteWorkflowAppParser.java | 11 +-
.../org/apache/oozie/workflow/lite/NodeDef.java | 82 ++++++-
core/src/main/resources/oozie-default.xml | 12 +-
.../oozie/command/wf/TestActionUserRetry.java | 215 +++++++++++++++++++
.../wf/TestForkedActionStartXCommand.java | 77 -------
.../command/wf/TestWorkflowKillXCommand.java | 19 ++
.../src/site/twiki/WorkflowFunctionalSpec.twiki | 11 +-
release-log.txt | 1 +
14 files changed, 396 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/client/src/main/resources/oozie-workflow-0.5.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/oozie-workflow-0.5.xsd b/client/src/main/resources/oozie-workflow-0.5.xsd
index fda49ed..8fe2b47 100644
--- a/client/src/main/resources/oozie-workflow-0.5.xsd
+++ b/client/src/main/resources/oozie-workflow-0.5.xsd
@@ -160,6 +160,7 @@
<xs:attribute name="cred" type="xs:string"/>
<xs:attribute name="retry-max" type="xs:string"/>
<xs:attribute name="retry-interval" type="xs:string"/>
+ <xs:attribute name="retry-policy" type="xs:string"/>
</xs:complexType>
<xs:complexType name="MAP-REDUCE">
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index ea4d340..d0551ff 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -210,7 +210,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> {
switch (ex.getErrorType()) {
case ERROR:
// If allowed to retry, this will handle it; otherwise, we should fall through to FAILED
- if (handleUserRetry(wfAction)) {
+ if (handleUserRetry(wfAction, wfJob)) {
break;
}
case FAILED:
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index d030a10..740b8d3 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -216,7 +216,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> {
shouldHandleUserRetry = true;
break;
}
- if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
+ if (!shouldHandleUserRetry || !handleUserRetry(wfAction, wfJob)) {
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
if(slaEvent != null) {
insertList.add(slaEvent);
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index e65c3bf..836e5d4 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -41,6 +41,7 @@ import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.service.CallbackService;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
@@ -53,7 +54,9 @@ import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
+import org.apache.oozie.workflow.lite.NodeDef;
/**
* Base class for Action execution commands. Provides common functionality to handle different types of errors while
@@ -154,8 +157,8 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
LOG.warn("Setting Action Status to [{0}]", status);
ActionExecutorContext aContext = (ActionExecutorContext) context;
WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
-
- if (!handleUserRetry(action)) {
+ WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow();
+ if (!handleUserRetry(action, wfJob)) {
incrActionErrorCounter(action.getType(), "error", 1);
action.setPending();
if (isStart) {
@@ -189,7 +192,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
*/
public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
- if (!handleUserRetry(action)) {
+ if (!handleUserRetry(action, workflow)) {
incrActionErrorCounter(action.getType(), "failed", 1);
LOG.warn("Failing Job due to failed action [{0}]", action.getName());
try {
@@ -217,7 +220,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
* @return true if user-retry has to be handled for this action
* @throws CommandException thrown if unable to fail job
*/
- public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
+ public boolean handleUserRetry(WorkflowActionBean action, WorkflowJobBean wfJob) throws CommandException {
String errorCode = action.getErrorCode();
Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
@@ -226,7 +229,8 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
+ "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
.getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
- int interval = action.getUserRetryInterval() * 60 * 1000;
+ ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob);
+ long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy);
action.setStatus(WorkflowAction.Status.USER_RETRY);
action.incrmentUserRetryCount();
action.setPending();
@@ -559,4 +563,42 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
}
}
+ /*
+ * Returns user retry policy
+ */
+ private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) {
+ WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
+ LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp();
+ NodeDef nodeDef = wfApp.getNode(wfAction.getName());
+ if (nodeDef == null) {
+ return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
+ }
+ String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase();
+ String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY)
+ .toUpperCase();
+ if (isValidRetryPolicy(userRetryPolicy)) {
+ return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy);
+ }
+ else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) {
+ return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig);
+ }
+ else {
+ return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
+ }
+ }
+
+ /*
+ * Returns true if policy is valid, otherwise false
+ */
+ private static boolean isValidRetryPolicy(String policy) {
+ try {
+ ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim());
+ }
+ catch (IllegalArgumentException e) {
+ // Invalid Policy
+ return false;
+ }
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
index 99ace13..ffc29af 100644
--- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
+++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
@@ -60,11 +60,14 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
+ public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy";
public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
+ public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC";
public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
+ public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2";
public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
public static final String USER_ERROR_CODE_ALL = "ALL";
@@ -195,15 +198,17 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
}
/**
- * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
+ * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or
+ * _oozie_inst_v_2
*
* @return nodedef default version
- * @throws WorkflowException thrown if there was an error parsing the action configuration.
- */
+ * @throws WorkflowException thrown if there was an error parsing the action
+ * configuration.
+ */
public static String getNodeDefDefaultVersion() throws WorkflowException {
String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION);
if (ret == null) {
- ret = NODE_DEF_VERSION_1;
+ ret = NODE_DEF_VERSION_2;
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
index d3a2793..97c7134 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
@@ -44,7 +44,8 @@ public class ActionNodeDef extends NodeDef {
}
public ActionNodeDef(String name, String conf, Class<? extends ActionNodeHandler> actionHandlerClass, String onOk,
- String onError, String cred, String userRetryMax, String userRetryInterval) {
- super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred, userRetryMax, userRetryInterval);
+ String onError, String cred, String userRetryMax, String userRetryInterval, String userRetryPolicy) {
+ super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred,
+ userRetryMax, userRetryInterval, userRetryPolicy);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
index a1b9cdb..bbd81a9 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
@@ -21,6 +21,8 @@ package org.apache.oozie.workflow.lite;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.io.Writable;
import org.apache.oozie.action.hadoop.FsActionExecutor;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.util.ELUtils;
@@ -90,6 +92,7 @@ public class LiteWorkflowAppParser {
private static final String USER_RETRY_MAX_A = "retry-max";
private static final String USER_RETRY_INTERVAL_A = "retry-interval";
private static final String TO_A = "to";
+ private static final String USER_RETRY_POLICY_A = "retry-policy";
private static final String FORK_PATH_E = "path";
private static final String FORK_START_A = "start";
@@ -485,6 +488,7 @@ public class LiteWorkflowAppParser {
String credStr = eNode.getAttributeValue(CRED_A);
String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
+ String userRetryPolicyStr = eNode.getAttributeValue(USER_RETRY_POLICY_A);
try {
if (!StringUtils.isEmpty(userRetryMaxStr)) {
userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf);
@@ -492,6 +496,9 @@ public class LiteWorkflowAppParser {
if (!StringUtils.isEmpty(userRetryIntervalStr)) {
userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr, jobConf);
}
+ if (!StringUtils.isEmpty(userRetryPolicyStr)) {
+ userRetryPolicyStr = ELUtils.resolveAppName(userRetryPolicyStr, jobConf);
+ }
}
catch (Exception e) {
throw new WorkflowException(ErrorCode.E0703, e.getMessage());
@@ -499,8 +506,8 @@ public class LiteWorkflowAppParser {
String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
- transitions[0], transitions[1], credStr,
- userRetryMaxStr, userRetryIntervalStr));
+ transitions[0], transitions[1], credStr, userRetryMaxStr, userRetryIntervalStr,
+ userRetryPolicyStr));
} else if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
// No operation is required
} else if (eNode.getName().equals(GLOBAL)) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
index 9e66d28..496b008 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
@@ -43,6 +43,7 @@ public class NodeDef implements Writable {
private String cred = null;
private String userRetryMax = "null";
private String userRetryInterval = "null";
+ private String userRetryPolicy = "null";
NodeDef() {
}
@@ -62,7 +63,7 @@ public class NodeDef implements Writable {
}
NodeDef(String name, String conf, Class<? extends NodeHandler> handlerClass, List<String> transitions, String cred,
- String userRetryMax, String userRetryInterval) {
+ String userRetryMax, String userRetryInterval, String userRetryPolicy) {
this(name, conf, handlerClass, transitions, cred);
if (userRetryMax != null) {
this.userRetryMax = userRetryMax;
@@ -70,6 +71,9 @@ public class NodeDef implements Writable {
if (userRetryInterval != null) {
this.userRetryInterval = userRetryInterval;
}
+ if (userRetryPolicy != null) {
+ this.userRetryPolicy = userRetryPolicy;
+ }
}
public boolean equals(NodeDef other) {
@@ -115,12 +119,20 @@ public class NodeDef implements Writable {
nodeDefVersion = LiteWorkflowStoreService.getNodeDefDefaultVersion();
}
catch (WorkflowException e) {
- nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1;
+ nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_2;
}
}
return nodeDefVersion;
}
+ public String getUserRetryPolicy() {
+ return userRetryPolicy;
+ }
+
+ public void setUserRetryPolicy(String userRetryPolicy) {
+ this.userRetryPolicy = userRetryPolicy;
+ }
+
@SuppressWarnings("unchecked")
private void readVersionZero(DataInput dataInput, String firstField) throws IOException {
if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_0)) {
@@ -151,7 +163,24 @@ public class NodeDef implements Writable {
}
@SuppressWarnings("unchecked")
private void readVersionOne(DataInput dataInput, String firstField) throws IOException {
- nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1;
+ readCommon(dataInput, firstField, LiteWorkflowStoreService.NODE_DEF_VERSION_1);
+ }
+
+ /*
+ * Reads according to version 2
+ */
+ @SuppressWarnings("unchecked")
+ private void readVersionTwo(DataInput dataInput, String firstField) throws IOException {
+ readCommon(dataInput, firstField, LiteWorkflowStoreService.NODE_DEF_VERSION_2);
+ userRetryPolicy = dataInput.readUTF();
+ }
+
+ /*
+ * Reads common part
+ */
+ @SuppressWarnings("unchecked")
+ private void readCommon(DataInput dataInput, String firstField, String nodeDefVer) throws IOException {
+ nodeDefVersion = nodeDefVer;
name = dataInput.readUTF();
cred = dataInput.readUTF();
if (cred.equals("null")) {
@@ -185,12 +214,16 @@ public class NodeDef implements Writable {
@Override
public void readFields(DataInput dataInput) throws IOException {
String firstField = dataInput.readUTF();
- if (!firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
- readVersionZero(dataInput, firstField);
- } else {
- //since oozie version 3.1
+ if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
+ // since oozie version 3.1
readVersionOne(dataInput, firstField);
}
+ else if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_2)) {
+ readVersionTwo(dataInput, firstField);
+ }
+ else {
+ readVersionZero(dataInput, firstField);
+ }
}
private void writeVersionZero(DataOutput dataOutput) throws IOException {
@@ -222,6 +255,29 @@ public class NodeDef implements Writable {
* @throws IOException thrown if fail to write
*/
private void writeVersionOne(DataOutput dataOutput) throws IOException {
+ writeCommon(dataOutput);
+ }
+
+ /**
+ * Write as version two format, this version was since 4.4.4.1.
+ *
+ * @param dataOutput data output to serialize node def
+ * @throws IOException thrown if fail to write
+ */
+ private void writeVersionTwo(DataOutput dataOutput) throws IOException {
+ writeCommon(dataOutput);
+ if (userRetryPolicy != null) {
+ dataOutput.writeUTF(userRetryPolicy);
+ }
+ else {
+ dataOutput.writeUTF("null");
+ }
+ }
+
+ /*
+ * Write the common part
+ */
+ private void writeCommon(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(nodeDefVersion);
dataOutput.writeUTF(name);
if (cred != null) {
@@ -260,12 +316,16 @@ public class NodeDef implements Writable {
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
- if (!getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
- writeVersionZero(dataOutput);
- } else {
- //since oozie version 3.1
+ if (getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
+ // since oozie version 3.1
writeVersionOne(dataOutput);
}
+ else if (getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_2)) {
+ writeVersionTwo(dataOutput);
+ }
+ else {
+ writeVersionZero(dataOutput);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 4563c73..530c2ed 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2116,6 +2116,14 @@ will be the requeue interval for the actions which are waiting for a long time w
</property>
<property>
+ <name>oozie.service.LiteWorkflowStoreService.user.retry.policy</name>
+ <value>periodic</value>
+ <description>
+ Automatic retry policy for workflow action. Possible values are periodic or exponential, periodic being the default.
+ </description>
+ </property>
+
+ <property>
<name>oozie.service.LiteWorkflowStoreService.user.retry.error.code</name>
<value>JA008,JA009,JA017,JA018,JA019,FS009,FS008,FS014</value>
<description>
@@ -2142,9 +2150,9 @@ will be the requeue interval for the actions which are waiting for a long time w
<property>
<name>oozie.service.LiteWorkflowStoreService.node.def.version</name>
- <value>_oozie_inst_v_1</value>
+ <value>_oozie_inst_v_2</value>
<description>
- NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
+ NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or _oozie_inst_v_2
</description>
</property>
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java
new file mode 100644
index 0000000..ca2b5a2
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ForTestingActionExecutor;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.ExtendedCallableQueueService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestActionUserRetry extends XDataTestCase {
+ private Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
+ setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
+ setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName());
+ services = new Services();
+ services.init();
+ services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testUserRetry() throws JPAExecutorException, IOException, CommandException{
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
+ + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">"
+ + "<path start=\"action1\"/>"
+ + "<path start=\"action2\"/>"
+ + "</fork>"
+ +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
+ + "<test xmlns=\"uri:test\">"
+ + "<signal-value>${wf:conf('signal-value')}</signal-value>"
+ + "<external-status>${wf:conf('external-status')}</external-status> "
+ + "<error>${wf:conf('error')}</error>"
+ + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+ + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+ + "<running-mode>${wf:conf('running-mode')}</running-mode>"
+ + "</test>"
+ + "<ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action2\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message></kill>"
+ + "<end name=\"end\"/>"
+ + "</workflow-app>";
+ //@Formatter:on
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("error", "start.error");
+ conf.set("external-status", "error");
+ conf.set("signal-value", "based_on_action_status");
+
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+ final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+
+ waitFor(20 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ return (action != null && action.getUserRetryCount() == 2);
+ }
+ });
+
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
+ assertEquals(2, action.getUserRetryCount());
+ }
+
+ public void testUserRetryPolicy() throws JPAExecutorException, IOException, CommandException {
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.5\" name=\"wf-fork\">" + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">" + "<path start=\"action1\"/>" + "<path start=\"action2\"/>" + "</fork>"
+ + "<action name=\"action1\" retry-max=\"2\" retry-interval=\"1\" retry-policy=\"exponential\">"
+ + "<test xmlns=\"uri:test\">"
+ + "<signal-value>${wf:conf('signal-value')}</signal-value>"
+ + "<external-status>${wf:conf('external-status')}</external-status> "
+ + "<error>${wf:conf('error')}</error>"
+ + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+ + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+ + "<running-mode>${wf:conf('running-mode')}</running-mode>" + "</test>" + "<ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>" + "</action>" + "<action name=\"action2\">" + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>" + "</action>" + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message></kill>" + "<end name=\"end\"/>" + "</workflow-app>";
+ // @Formatter:on
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("error", "start.error");
+ conf.set("external-status", "error");
+ conf.set("signal-value", "based_on_action_status");
+
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+ final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ // set a timeout for exponential retry of action with respect to given
+ // retry-interval and retry-max.
+ // If retry-interval is 1 then, for first retry, delay will be 1 min,
+ // for second retry it will be 2 min, 4, 8, 16 & so on.
+ int timeout = (1 + 2) * 60 * 1000;
+ waitFor(timeout, new Predicate() {
+ public boolean evaluate() throws Exception {
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ return (action != null && action.getUserRetryCount() == 2);
+ }
+ });
+
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
+ assertEquals(2, action.getUserRetryCount());
+ }
+
+ private void writeToFile(String appXml, String appPath) throws IOException {
+ File wf = new File(URI.create(appPath));
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(new FileWriter(wf));
+ out.println(appXml);
+ }
+ catch (IOException iex) {
+ throw iex;
+ }
+ finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
index e685621..8eb7438 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
@@ -23,22 +23,16 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ForTestingActionExecutor;
-import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ExtendedCallableQueueService;
-import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
@@ -153,77 +147,6 @@ public class TestForkedActionStartXCommand extends XDataTestCase {
WorkflowJob.Status.KILLED);
}
- public void testUserRetry() throws JPAExecutorException, IOException, CommandException{
- Configuration conf = new XConfiguration();
- String workflowUri = getTestCaseFileUri("workflow.xml");
-
- //@formatter:off
- String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
- + "<start to=\"fork1\"/>"
- + "<fork name=\"fork1\">"
- + "<path start=\"action1\"/>"
- + "<path start=\"action2\"/>"
- + "</fork>"
- +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
- + "<test xmlns=\"uri:test\">"
- + "<signal-value>${wf:conf('signal-value')}</signal-value>"
- + "<external-status>${wf:conf('external-status')}</external-status> "
- + "<error>${wf:conf('error')}</error>"
- + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
- + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
- + "<running-mode>${wf:conf('running-mode')}</running-mode>"
- + "</test>"
- + "<ok to=\"join1\"/>"
- + "<error to=\"kill\"/>"
- + "</action>"
- + "<action name=\"action2\">"
- + "<fs></fs><ok to=\"join1\"/>"
- + "<error to=\"kill\"/>"
- + "</action>"
- + "<join name=\"join1\" to=\"end\"/>"
- + "<kill name=\"kill\"><message>killed</message></kill>"
- + "<end name=\"end\"/>"
- + "</workflow-app>";
- //@Formatter:on
- writeToFile(appXml, workflowUri);
- conf.set(OozieClient.APP_PATH, workflowUri);
- conf.set(OozieClient.USER_NAME, getTestUser());
- conf.set("error", "start.error");
- conf.set("external-status", "error");
- conf.set("signal-value", "based_on_action_status");
-
- SubmitXCommand sc = new SubmitXCommand(conf);
- final String jobId = sc.call();
- new StartXCommand(jobId).call();
- final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
- final JPAService jpaService = Services.get().get(JPAService.class);
-
- waitFor(20 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
- WorkflowActionBean action = null;
- for (WorkflowActionBean bean : actions) {
- if (bean.getType().equals("test")) {
- action = bean;
- break;
- }
- }
- return (action != null && action.getUserRetryCount() == 2);
- }
- });
-
- List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
- WorkflowActionBean action = null;
- for (WorkflowActionBean bean : actions) {
- if (bean.getType().equals("test")) {
- action = bean;
- break;
- }
- }
- assertNotNull(action);
- assertEquals(2, action.getUserRetryCount());
- }
-
private void writeToFile(String appXml, String appPath) throws IOException {
File wf = new File(URI.create(appPath));
PrintWriter out = null;
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
index e493d4d..8cc3694 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
@@ -159,6 +159,25 @@ public class TestWorkflowKillXCommand extends XDataTestCase {
assertEquals(action.getStatus(), WorkflowAction.Status.KILLED);
wfInstance = job.getWorkflowInstance();
assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
+
+ services.destroy();
+
+ sleep(5000);
+
+ setSystemProperty(LiteWorkflowStoreService.CONF_NODE_DEF_VERSION, LiteWorkflowStoreService.NODE_DEF_VERSION_2);
+ services = new Services();
+ services.init();
+
+ sleep(5000);
+
+ jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(wfJobGetCmd);
+ action = jpaService.execute(wfActionGetCmd);
+ assertEquals(job.getStatus(), WorkflowJob.Status.KILLED);
+ assertEquals(action.getStatus(), WorkflowAction.Status.KILLED);
+ wfInstance = job.getWorkflowInstance();
+ assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
index e7ac50d..9db2a0d 100644
--- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
+++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
@@ -2550,11 +2550,17 @@ Oozie adminstrator can allow more error codes to be handled for User-Retry. By a
=oozie.service.LiteWorkflowStoreService.user.retry.error.code.ext= to =oozie.site.xml=
and error codes as value, these error codes will be considered as User-Retry after system restart.
+Since Oozie 4.3, User-retry allows user to mention retry policy. The value for policy can be =periodic=
+or =exponential=, =periodic= being the default. Oozie administrator can define user retry policy for all workflow
+actions by adding this configuration =oozie.service.LiteWorkflowStoreService.user.retry.policy= to =oozie.site.xml=.
+This value will be considered as user retry policy after system restart. This value can be overridden while defining
+actions in workflow xml if needed. The =retry-interval= should be specified in minutes.
+
Examples of User-Retry in a workflow action is :
<verbatim>
-<workflow-app xmlns="uri:oozie:workflow:0.3" name="wf-name">
-<action name="a" retry-max="2" retry-interval="1">
+<workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-name">
+<action name="a" retry-max="2" retry-interval="1" retry-policy="exponential">
</action>
</verbatim>
@@ -2750,6 +2756,7 @@ to be executed.
<xs:attribute name="cred" type="xs:string"/>
<xs:attribute name="retry-max" type="xs:string"/>
<xs:attribute name="retry-interval" type="xs:string"/>
+ <xs:attribute name="retry-policy" type="xs:string"/>
</xs:complexType>
<xs:complexType name="MAP-REDUCE">
http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 01a8099..5913ab7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma)
OOZIE-2539 Incorrect property key is used for 'hive log4j configuration file for execution mode' (abhishekbafna via jaydeepvishwakarma)
OOZIE-2565 [Oozie web Console] Make the timezones in settings tab to be sorted by default (meetchandan via jaydeepvishwakarma)
OOZIE-2520 SortBy filter for ordering the jobs query results (abhishekbafna via jaydeepvishwakarma)