You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ja...@apache.org on 2016/08/02 18:06:56 UTC

oozie git commit: OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma)

Repository: oozie
Updated Branches:
  refs/heads/master 273003062 -> 99a3df568


OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/99a3df56
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/99a3df56
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/99a3df56

Branch: refs/heads/master
Commit: 99a3df568d182e75751126aac5fe8b63d876e64f
Parents: 2730030
Author: jvishwakarma <jv...@walmartlabs.com>
Authored: Tue Aug 2 23:36:18 2016 +0530
Committer: jvishwakarma <jv...@walmartlabs.com>
Committed: Tue Aug 2 23:36:18 2016 +0530

----------------------------------------------------------------------
 .../src/main/resources/oozie-workflow-0.5.xsd   |   1 +
 .../oozie/command/wf/ActionCheckXCommand.java   |   2 +-
 .../oozie/command/wf/ActionEndXCommand.java     |   2 +-
 .../apache/oozie/command/wf/ActionXCommand.java |  52 ++++-
 .../oozie/service/LiteWorkflowStoreService.java |  13 +-
 .../oozie/workflow/lite/ActionNodeDef.java      |   5 +-
 .../workflow/lite/LiteWorkflowAppParser.java    |  11 +-
 .../org/apache/oozie/workflow/lite/NodeDef.java |  82 ++++++-
 core/src/main/resources/oozie-default.xml       |  12 +-
 .../oozie/command/wf/TestActionUserRetry.java   | 215 +++++++++++++++++++
 .../wf/TestForkedActionStartXCommand.java       |  77 -------
 .../command/wf/TestWorkflowKillXCommand.java    |  19 ++
 .../src/site/twiki/WorkflowFunctionalSpec.twiki |  11 +-
 release-log.txt                                 |   1 +
 14 files changed, 396 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/client/src/main/resources/oozie-workflow-0.5.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/oozie-workflow-0.5.xsd b/client/src/main/resources/oozie-workflow-0.5.xsd
index fda49ed..8fe2b47 100644
--- a/client/src/main/resources/oozie-workflow-0.5.xsd
+++ b/client/src/main/resources/oozie-workflow-0.5.xsd
@@ -160,6 +160,7 @@
         <xs:attribute name="cred" type="xs:string"/>
         <xs:attribute name="retry-max" type="xs:string"/>
         <xs:attribute name="retry-interval" type="xs:string"/>
+        <xs:attribute name="retry-policy" type="xs:string"/>
     </xs:complexType>
 
     <xs:complexType name="MAP-REDUCE">

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index ea4d340..d0551ff 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -210,7 +210,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> {
             switch (ex.getErrorType()) {
                 case ERROR:
                     // If allowed to retry, this will handle it; otherwise, we should fall through to FAILED
-                    if (handleUserRetry(wfAction)) {
+                    if (handleUserRetry(wfAction, wfJob)) {
                         break;
                     }
                 case FAILED:

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index d030a10..740b8d3 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -216,7 +216,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> {
                         shouldHandleUserRetry = true;
                         break;
                 }
-                if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
+                if (!shouldHandleUserRetry || !handleUserRetry(wfAction, wfJob)) {
                     SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
                     if(slaEvent != null) {
                         insertList.add(slaEvent);

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index e65c3bf..836e5d4 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -41,6 +41,7 @@ import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.service.CallbackService;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -53,7 +54,9 @@ import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.workflow.WorkflowException;
 import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
+import org.apache.oozie.workflow.lite.NodeDef;
 
 /**
  * Base class for Action execution commands. Provides common functionality to handle different types of errors while
@@ -154,8 +157,8 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
         LOG.warn("Setting Action Status to [{0}]", status);
         ActionExecutorContext aContext = (ActionExecutorContext) context;
         WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
-
-        if (!handleUserRetry(action)) {
+        WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow();
+        if (!handleUserRetry(action, wfJob)) {
             incrActionErrorCounter(action.getType(), "error", 1);
             action.setPending();
             if (isStart) {
@@ -189,7 +192,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
      */
     public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
         WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
-        if (!handleUserRetry(action)) {
+        if (!handleUserRetry(action, workflow)) {
             incrActionErrorCounter(action.getType(), "failed", 1);
             LOG.warn("Failing Job due to failed action [{0}]", action.getName());
             try {
@@ -217,7 +220,7 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
      * @return true if user-retry has to be handled for this action
      * @throws CommandException thrown if unable to fail job
      */
-    public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
+    public boolean handleUserRetry(WorkflowActionBean action, WorkflowJobBean wfJob) throws CommandException {
         String errorCode = action.getErrorCode();
         Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
 
@@ -226,7 +229,8 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
             LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
                     + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
                     .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
-            int interval = action.getUserRetryInterval() * 60 * 1000;
+            ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob);
+            long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy);
             action.setStatus(WorkflowAction.Status.USER_RETRY);
             action.incrmentUserRetryCount();
             action.setPending();
@@ -559,4 +563,42 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
         }
     }
 
+    /*
+     * Returns user retry policy
+     */
+    private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) {
+        WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
+        LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp();
+        NodeDef nodeDef = wfApp.getNode(wfAction.getName());
+        if (nodeDef == null) {
+            return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
+        }
+        String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase();
+        String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY)
+                .toUpperCase();
+        if (isValidRetryPolicy(userRetryPolicy)) {
+            return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy);
+        }
+        else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) {
+            return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig);
+        }
+        else {
+            return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
+        }
+    }
+
+    /*
+     * Returns true if policy is valid, otherwise false
+     */
+    private static boolean isValidRetryPolicy(String policy) {
+        try {
+            ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim());
+        }
+        catch (IllegalArgumentException e) {
+            // Invalid Policy
+            return false;
+        }
+        return true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
index 99ace13..ffc29af 100644
--- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
+++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
@@ -60,11 +60,14 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
     public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
     public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
     public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
+    public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy";
     public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
     public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
+    public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC";
 
     public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
     public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
+    public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2";
     public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
 
     public static final String USER_ERROR_CODE_ALL = "ALL";
@@ -195,15 +198,17 @@ public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
     }
 
     /**
-     * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
+     * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or
+     * _oozie_inst_v_2
      *
      * @return nodedef default version
-     * @throws WorkflowException thrown if there was an error parsing the action configuration.
-    */
+     * @throws WorkflowException thrown if there was an error parsing the action
+     *         configuration.
+     */
     public static String getNodeDefDefaultVersion() throws WorkflowException {
         String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION);
         if (ret == null) {
-            ret = NODE_DEF_VERSION_1;
+            ret = NODE_DEF_VERSION_2;
         }
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
index d3a2793..97c7134 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
@@ -44,7 +44,8 @@ public class ActionNodeDef extends NodeDef {
     }
     
     public ActionNodeDef(String name, String conf, Class<? extends ActionNodeHandler> actionHandlerClass, String onOk,
-            String onError, String cred, String userRetryMax, String userRetryInterval) {
-        super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred, userRetryMax, userRetryInterval);
+            String onError, String cred, String userRetryMax, String userRetryInterval, String userRetryPolicy) {
+        super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred,
+                userRetryMax, userRetryInterval, userRetryPolicy);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
index a1b9cdb..bbd81a9 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
@@ -21,6 +21,8 @@ package org.apache.oozie.workflow.lite;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.oozie.action.hadoop.FsActionExecutor;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.util.ELUtils;
@@ -90,6 +92,7 @@ public class LiteWorkflowAppParser {
     private static final String USER_RETRY_MAX_A = "retry-max";
     private static final String USER_RETRY_INTERVAL_A = "retry-interval";
     private static final String TO_A = "to";
+    private static final String USER_RETRY_POLICY_A = "retry-policy";
 
     private static final String FORK_PATH_E = "path";
     private static final String FORK_START_A = "start";
@@ -485,6 +488,7 @@ public class LiteWorkflowAppParser {
                 String credStr = eNode.getAttributeValue(CRED_A);
                 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
                 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
+                String userRetryPolicyStr = eNode.getAttributeValue(USER_RETRY_POLICY_A);
                 try {
                     if (!StringUtils.isEmpty(userRetryMaxStr)) {
                         userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf);
@@ -492,6 +496,9 @@ public class LiteWorkflowAppParser {
                     if (!StringUtils.isEmpty(userRetryIntervalStr)) {
                         userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr, jobConf);
                     }
+                    if (!StringUtils.isEmpty(userRetryPolicyStr)) {
+                        userRetryPolicyStr = ELUtils.resolveAppName(userRetryPolicyStr, jobConf);
+                    }
                 }
                 catch (Exception e) {
                     throw new WorkflowException(ErrorCode.E0703, e.getMessage());
@@ -499,8 +506,8 @@ public class LiteWorkflowAppParser {
 
                 String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
                 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
-                                              transitions[0], transitions[1], credStr,
-                                              userRetryMaxStr, userRetryIntervalStr));
+                        transitions[0], transitions[1], credStr, userRetryMaxStr, userRetryIntervalStr,
+                        userRetryPolicyStr));
             } else if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
                 // No operation is required
             } else if (eNode.getName().equals(GLOBAL)) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
index 9e66d28..496b008 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
@@ -43,6 +43,7 @@ public class NodeDef implements Writable {
     private String cred = null;
     private String userRetryMax = "null";
     private String userRetryInterval = "null";
+    private String userRetryPolicy = "null";
 
     NodeDef() {
     }
@@ -62,7 +63,7 @@ public class NodeDef implements Writable {
     }
 
     NodeDef(String name, String conf, Class<? extends NodeHandler> handlerClass, List<String> transitions, String cred,
-            String userRetryMax, String userRetryInterval) {
+            String userRetryMax, String userRetryInterval, String userRetryPolicy) {
         this(name, conf, handlerClass, transitions, cred);
         if (userRetryMax != null) {
             this.userRetryMax = userRetryMax;
@@ -70,6 +71,9 @@ public class NodeDef implements Writable {
         if (userRetryInterval != null) {
             this.userRetryInterval = userRetryInterval;
         }
+        if (userRetryPolicy != null) {
+            this.userRetryPolicy = userRetryPolicy;
+        }
     }
 
     public boolean equals(NodeDef other) {
@@ -115,12 +119,20 @@ public class NodeDef implements Writable {
                 nodeDefVersion = LiteWorkflowStoreService.getNodeDefDefaultVersion();
             }
             catch (WorkflowException e) {
-                nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1;
+                nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_2;
             }
         }
         return nodeDefVersion;
     }
 
+    public String getUserRetryPolicy() {
+        return userRetryPolicy;
+    }
+
+    public void setUserRetryPolicy(String userRetryPolicy) {
+        this.userRetryPolicy = userRetryPolicy;
+    }
+
     @SuppressWarnings("unchecked")
     private void readVersionZero(DataInput dataInput, String firstField) throws IOException {
         if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_0)) {
@@ -151,7 +163,24 @@ public class NodeDef implements Writable {
     }
     @SuppressWarnings("unchecked")
     private void readVersionOne(DataInput dataInput, String firstField) throws IOException {
-        nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1;
+        readCommon(dataInput, firstField, LiteWorkflowStoreService.NODE_DEF_VERSION_1);
+    }
+
+    /*
+     * Reads according to version 2
+     */
+    @SuppressWarnings("unchecked")
+    private void readVersionTwo(DataInput dataInput, String firstField) throws IOException {
+        readCommon(dataInput, firstField, LiteWorkflowStoreService.NODE_DEF_VERSION_2);
+        userRetryPolicy = dataInput.readUTF();
+    }
+
+    /*
+     * Reads common part
+     */
+    @SuppressWarnings("unchecked")
+    private void readCommon(DataInput dataInput, String firstField, String nodeDefVer) throws IOException {
+        nodeDefVersion = nodeDefVer;
         name = dataInput.readUTF();
         cred = dataInput.readUTF();
         if (cred.equals("null")) {
@@ -185,12 +214,16 @@ public class NodeDef implements Writable {
     @Override
     public void readFields(DataInput dataInput) throws IOException {
         String firstField = dataInput.readUTF();
-        if (!firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
-            readVersionZero(dataInput, firstField);
-        } else {
-            //since oozie version 3.1
+        if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
+            // since oozie version 3.1
             readVersionOne(dataInput, firstField);
         }
+        else if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_2)) {
+            readVersionTwo(dataInput, firstField);
+        }
+        else {
+            readVersionZero(dataInput, firstField);
+        }
     }
 
     private void writeVersionZero(DataOutput dataOutput) throws IOException {
@@ -222,6 +255,29 @@ public class NodeDef implements Writable {
      * @throws IOException thrown if fail to write
      */
     private void writeVersionOne(DataOutput dataOutput) throws IOException {
+        writeCommon(dataOutput);
+    }
+
+    /**
+     * Write as version two format, this version was since 4.4.4.1.
+     *
+     * @param dataOutput data output to serialize node def
+     * @throws IOException thrown if fail to write
+     */
+    private void writeVersionTwo(DataOutput dataOutput) throws IOException {
+        writeCommon(dataOutput);
+        if (userRetryPolicy != null) {
+            dataOutput.writeUTF(userRetryPolicy);
+        }
+        else {
+            dataOutput.writeUTF("null");
+        }
+    }
+
+    /*
+     * Write the common part
+     */
+    private void writeCommon(DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(nodeDefVersion);
         dataOutput.writeUTF(name);
         if (cred != null) {
@@ -260,12 +316,16 @@ public class NodeDef implements Writable {
      */
     @Override
     public void write(DataOutput dataOutput) throws IOException {
-        if (!getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
-            writeVersionZero(dataOutput);
-        } else {
-            //since oozie version 3.1
+        if (getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
+            // since oozie version 3.1
             writeVersionOne(dataOutput);
         }
+        else if (getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_2)) {
+            writeVersionTwo(dataOutput);
+        }
+        else {
+            writeVersionZero(dataOutput);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 4563c73..530c2ed 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2116,6 +2116,14 @@ will be the requeue interval for the actions which are waiting for a long time w
     </property>
 
     <property>
+        <name>oozie.service.LiteWorkflowStoreService.user.retry.policy</name>
+        <value>periodic</value>
+        <description>
+            Automatic retry policy for workflow action. Possible values are periodic or exponential, periodic being the default.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.service.LiteWorkflowStoreService.user.retry.error.code</name>
         <value>JA008,JA009,JA017,JA018,JA019,FS009,FS008,FS014</value>
         <description>
@@ -2142,9 +2150,9 @@ will be the requeue interval for the actions which are waiting for a long time w
     
     <property>
         <name>oozie.service.LiteWorkflowStoreService.node.def.version</name>
-        <value>_oozie_inst_v_1</value>
+        <value>_oozie_inst_v_2</value>
         <description>
-            NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
+            NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or _oozie_inst_v_2
         </description>
     </property>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java
new file mode 100644
index 0000000..ca2b5a2
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionUserRetry.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ForTestingActionExecutor;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.ExtendedCallableQueueService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestActionUserRetry extends XDataTestCase {
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
+        setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
+        setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName());
+        services = new Services();
+        services.init();
+        services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testUserRetry() throws JPAExecutorException, IOException, CommandException{
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "</fork>"
+                +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
+                + "<test xmlns=\"uri:test\">"
+                +    "<signal-value>${wf:conf('signal-value')}</signal-value>"
+                +    "<external-status>${wf:conf('external-status')}</external-status> "
+                +    "<error>${wf:conf('error')}</error>"
+                +    "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+                +    "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+                +    "<running-mode>${wf:conf('running-mode')}</running-mode>"
+                + "</test>"
+                + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message></kill>"
+                + "<end name=\"end\"/>"
+                + "</workflow-app>";
+           //@Formatter:on
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("error", "start.error");
+        conf.set("external-status", "error");
+        conf.set("signal-value", "based_on_action_status");
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+        final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+
+        waitFor(20 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+                WorkflowActionBean action = null;
+                for (WorkflowActionBean bean : actions) {
+                    if (bean.getType().equals("test")) {
+                        action = bean;
+                        break;
+                    }
+                }
+                return (action != null && action.getUserRetryCount() == 2);
+            }
+        });
+
+        List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+        WorkflowActionBean action = null;
+        for (WorkflowActionBean bean : actions) {
+            if (bean.getType().equals("test")) {
+                action = bean;
+                break;
+            }
+        }
+        assertNotNull(action);
+        assertEquals(2, action.getUserRetryCount());
+    }
+
+    public void testUserRetryPolicy() throws JPAExecutorException, IOException, CommandException {
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.5\" name=\"wf-fork\">" + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">" + "<path start=\"action1\"/>" + "<path start=\"action2\"/>" + "</fork>"
+                + "<action name=\"action1\" retry-max=\"2\" retry-interval=\"1\" retry-policy=\"exponential\">"
+                + "<test xmlns=\"uri:test\">"
+                + "<signal-value>${wf:conf('signal-value')}</signal-value>"
+                + "<external-status>${wf:conf('external-status')}</external-status> "
+                + "<error>${wf:conf('error')}</error>"
+                + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+                + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+                + "<running-mode>${wf:conf('running-mode')}</running-mode>" + "</test>" + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>" + "</action>" + "<action name=\"action2\">" + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>" + "</action>" + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message></kill>" + "<end name=\"end\"/>" + "</workflow-app>";
+        // @Formatter:on
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("error", "start.error");
+        conf.set("external-status", "error");
+        conf.set("signal-value", "based_on_action_status");
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+        final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        // set a timeout for exponential retry of action with respect to given
+        // retry-interval and retry-max.
+        // If retry-interval is 1 then, for first retry, delay will be 1 min,
+        // for second retry it will be 2 min, 4, 8, 16 & so on.
+        int timeout = (1 + 2) * 60 * 1000;
+        waitFor(timeout, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+                WorkflowActionBean action = null;
+                for (WorkflowActionBean bean : actions) {
+                    if (bean.getType().equals("test")) {
+                        action = bean;
+                        break;
+                    }
+                }
+                return (action != null && action.getUserRetryCount() == 2);
+            }
+        });
+
+        List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+        WorkflowActionBean action = null;
+        for (WorkflowActionBean bean : actions) {
+            if (bean.getType().equals("test")) {
+                action = bean;
+                break;
+            }
+        }
+        assertNotNull(action);
+        assertEquals(2, action.getUserRetryCount());
+    }
+
+    private void writeToFile(String appXml, String appPath) throws IOException {
+        File wf = new File(URI.create(appPath));
+        PrintWriter out = null;
+        try {
+            out = new PrintWriter(new FileWriter(wf));
+            out.println(appXml);
+        }
+        catch (IOException iex) {
+            throw iex;
+        }
+        finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
index e685621..8eb7438 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
@@ -23,22 +23,16 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ForTestingActionExecutor;
-import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.ExtendedCallableQueueService;
-import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
@@ -153,77 +147,6 @@ public class TestForkedActionStartXCommand extends XDataTestCase {
                 WorkflowJob.Status.KILLED);
     }
 
-    public void testUserRetry() throws JPAExecutorException, IOException, CommandException{
-        Configuration conf = new XConfiguration();
-        String workflowUri = getTestCaseFileUri("workflow.xml");
-
-        //@formatter:off
-        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
-                + "<start to=\"fork1\"/>"
-                + "<fork name=\"fork1\">"
-                + "<path start=\"action1\"/>"
-                + "<path start=\"action2\"/>"
-                + "</fork>"
-                +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
-                + "<test xmlns=\"uri:test\">"
-                +    "<signal-value>${wf:conf('signal-value')}</signal-value>"
-                +    "<external-status>${wf:conf('external-status')}</external-status> "
-                +    "<error>${wf:conf('error')}</error>"
-                +    "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
-                +    "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
-                +    "<running-mode>${wf:conf('running-mode')}</running-mode>"
-                + "</test>"
-                + "<ok to=\"join1\"/>"
-                + "<error to=\"kill\"/>"
-                + "</action>"
-                + "<action name=\"action2\">"
-                + "<fs></fs><ok to=\"join1\"/>"
-                + "<error to=\"kill\"/>"
-                + "</action>"
-                + "<join name=\"join1\" to=\"end\"/>"
-                + "<kill name=\"kill\"><message>killed</message></kill>"
-                + "<end name=\"end\"/>"
-                + "</workflow-app>";
-           //@Formatter:on
-        writeToFile(appXml, workflowUri);
-        conf.set(OozieClient.APP_PATH, workflowUri);
-        conf.set(OozieClient.USER_NAME, getTestUser());
-        conf.set("error", "start.error");
-        conf.set("external-status",  "error");
-        conf.set("signal-value", "based_on_action_status");
-
-        SubmitXCommand sc = new SubmitXCommand(conf);
-        final String jobId = sc.call();
-        new StartXCommand(jobId).call();
-        final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
-        final JPAService jpaService = Services.get().get(JPAService.class);
-
-        waitFor(20 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
-                WorkflowActionBean action = null;
-                for (WorkflowActionBean bean : actions) {
-                    if (bean.getType().equals("test")) {
-                        action = bean;
-                        break;
-                    }
-                }
-                return (action != null && action.getUserRetryCount() == 2);
-            }
-        });
-
-        List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
-        WorkflowActionBean action = null;
-        for (WorkflowActionBean bean : actions) {
-            if (bean.getType().equals("test")) {
-                action = bean;
-                break;
-            }
-        }
-        assertNotNull(action);
-        assertEquals(2, action.getUserRetryCount());
-    }
-
     private void writeToFile(String appXml, String appPath) throws IOException {
         File wf = new File(URI.create(appPath));
         PrintWriter out = null;

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
index e493d4d..8cc3694 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
@@ -159,6 +159,25 @@ public class TestWorkflowKillXCommand extends XDataTestCase {
         assertEquals(action.getStatus(), WorkflowAction.Status.KILLED);
         wfInstance = job.getWorkflowInstance();
         assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
+
+        services.destroy();
+
+        sleep(5000);
+
+        setSystemProperty(LiteWorkflowStoreService.CONF_NODE_DEF_VERSION, LiteWorkflowStoreService.NODE_DEF_VERSION_2);
+        services = new Services();
+        services.init();
+
+        sleep(5000);
+
+        jpaService = Services.get().get(JPAService.class);
+        job = jpaService.execute(wfJobGetCmd);
+        action = jpaService.execute(wfActionGetCmd);
+        assertEquals(job.getStatus(), WorkflowJob.Status.KILLED);
+        assertEquals(action.getStatus(), WorkflowAction.Status.KILLED);
+        wfInstance = job.getWorkflowInstance();
+        assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
index e7ac50d..9db2a0d 100644
--- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
+++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
@@ -2550,11 +2550,17 @@ Oozie adminstrator can allow more error codes to be handled for User-Retry. By a
 =oozie.service.LiteWorkflowStoreService.user.retry.error.code.ext= to =oozie.site.xml=
 and error codes as value, these error codes will be considered as User-Retry after system restart.
 
+Since Oozie 4.3, User-retry allows user to mention retry policy. The value for policy can be =periodic=
+or =exponential=, =periodic= being the default. Oozie administrator can define user retry policy for all workflow
+actions by adding this configuration =oozie.service.LiteWorkflowStoreService.user.retry.policy= to =oozie.site.xml=.
+This value will be considered as user retry policy after system restart. This value can be overridden while defining
+actions in workflow xml if needed. The =retry-interval= should be specified in minutes.
+
 Examples of User-Retry in a workflow action is :
 
 <verbatim>
-<workflow-app xmlns="uri:oozie:workflow:0.3" name="wf-name">
-<action name="a" retry-max="2" retry-interval="1">
+<workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-name">
+<action name="a" retry-max="2" retry-interval="1" retry-policy="exponential">
 </action>
 </verbatim>
 
@@ -2750,6 +2756,7 @@ to be executed.
         <xs:attribute name="cred" type="xs:string"/>
         <xs:attribute name="retry-max" type="xs:string"/>
         <xs:attribute name="retry-interval" type="xs:string"/>
+        <xs:attribute name="retry-policy" type="xs:string"/>
     </xs:complexType>
 
     <xs:complexType name="MAP-REDUCE">

http://git-wip-us.apache.org/repos/asf/oozie/blob/99a3df56/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 01a8099..5913ab7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2440 Exponential re-try policy for workflow action (satishsaley via jaydeepvishwakarma)
 OOZIE-2539 Incorrect property key is used for 'hive log4j configuration file for execution mode' (abhishekbafna via jaydeepvishwakarma)
 OOZIE-2565 [Oozie web Console] Make the timezones in settings tab to be sorted by default (meetchandan via jaydeepvishwakarma)
 OOZIE-2520 SortBy filter for ordering the jobs query results (abhishekbafna via jaydeepvishwakarma)