You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/07/01 20:51:04 UTC

oozie git commit: OOZIE-2436 Fork/join workflow fails with oozie.action.yarn.tag must not be null

Repository: oozie
Updated Branches:
  refs/heads/master 1c4d56164 -> 2322d496c


OOZIE-2436 Fork/join workflow fails with oozie.action.yarn.tag must not be null


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2322d496
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2322d496
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2322d496

Branch: refs/heads/master
Commit: 2322d496c73dac43859771a9564776c098289e5e
Parents: 1c4d561
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Fri Jul 1 13:50:53 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Fri Jul 1 13:50:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/action/ActionExecutor.java | 26 ++++++++
 .../oozie/action/hadoop/JavaActionExecutor.java | 43 ++++++-------
 .../action/oozie/SubWorkflowActionExecutor.java |  9 ++-
 .../oozie/command/wf/ActionStartXCommand.java   | 30 ++++-----
 .../apache/oozie/command/wf/ActionXCommand.java | 40 +++++++++++-
 .../command/wf/ForkedActionStartXCommand.java   |  7 +++
 .../apache/oozie/command/wf/SignalXCommand.java | 65 +++++++++++++-------
 .../oozie/workflow/lite/ControlNodeHandler.java |  6 ++
 .../action/hadoop/TestJavaActionExecutor.java   |  6 ++
 release-log.txt                                 |  1 +
 10 files changed, 162 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 2be4549..3f978fd 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -56,6 +56,9 @@ public abstract class ActionExecutor {
 
     public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy";
 
+    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
+
+
     /**
      * Error code used by {@link #convertException} when there is not register error information for an exception.
      */
@@ -581,4 +584,27 @@ public abstract class ActionExecutor {
     public boolean supportsConfigurationJobXML() {
         return false;
     }
+
+    /**
+     * Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
+     * one child job is running. Tag is formed as follows:
+     * For workflow job, tag = action-id
+     * For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
+     * coord-action-id@subflow-action-name@action-name.
+     * @param conf the conf
+     * @param wfJob the wf job
+     * @param action the action
+     * @return the action yarn tag
+     */
+    public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
+        if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+            return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
+        }
+        else if (wfJob.getParentId() != null) {
+            return wfJob.getParentId() + "@" + action.getName();
+        }
+        else {
+            return action.getId();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 639003e..f2273d6 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -62,7 +62,6 @@ import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
-import org.apache.oozie.command.wf.ActionStartXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -125,6 +124,8 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
     public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
 
+    public XConfiguration workflowConf = null;
+
     static {
         DISALLOWED_PROPERTIES.add(HADOOP_USER);
         DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
@@ -852,7 +853,7 @@ public class JavaActionExecutor extends ActionExecutor {
             throws ActionExecutorException {
         XConfiguration wfJobConf = null;
         try {
-            wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+            wfJobConf = getWorkflowConf(context);
         }
         catch (IOException ioe) {
             throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
@@ -915,14 +916,6 @@ public class JavaActionExecutor extends ActionExecutor {
             launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
             setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
 
-            String launcherTag = null;
-            // Extracting tag and appending action name to maintain the uniqueness.
-            if (context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
-                launcherTag = context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG);
-            } else { //Keeping it to maintain backward compatibly with test cases.
-                launcherTag = action.getId();
-            }
-
             // Properties for when a launcher job's AM gets restarted
             if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) {
                 // launcher time filter is required to prune the search of launcher tag.
@@ -930,14 +923,16 @@ public class JavaActionExecutor extends ActionExecutor {
                 // time. Workflow created time is good enough when workflow is running independently or workflow is
                 // rerunning from failed node.
                 long launcherTime = System.currentTimeMillis();
-                String coordActionNominalTime = context.getProtoActionConf()
-                        .get(CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME);
+                String coordActionNominalTime = context.getProtoActionConf().get(
+                        CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME);
                 if (coordActionNominalTime != null) {
                     launcherTime = Long.parseLong(coordActionNominalTime);
-                } else if (context.getWorkflow().getCreatedTime() != null) {
+                }
+                else if (context.getWorkflow().getCreatedTime() != null) {
                     launcherTime = context.getWorkflow().getCreatedTime().getTime();
                 }
-                LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag, launcherTime);
+                String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action);
+                LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
             }
             else {
                 LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties",
@@ -1237,13 +1232,7 @@ public class JavaActionExecutor extends ActionExecutor {
         HashMap<String, CredentialsProperties> credPropertiesMap = null;
         if (context != null && action != null) {
             if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
-                XConfiguration wfJobConf = null;
-                try {
-                    wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
-                } catch (IOException ioe) {
-                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
-                            ioe.getMessage());
-                }
+                XConfiguration wfJobConf = getWorkflowConf(context);
                 if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
                     !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
                     credPropertiesMap = getActionCredentialsProperties(context, action);
@@ -1327,7 +1316,7 @@ public class JavaActionExecutor extends ActionExecutor {
             throws Exception {
         CredentialsProperties credProp = null;
         String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
-        XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+        XConfiguration wfjobConf = getWorkflowConf(context);
         Element elementJob = XmlUtils.parseXml(workflowXml);
         Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
         if (credentials != null) {
@@ -1675,7 +1664,7 @@ public class JavaActionExecutor extends ActionExecutor {
         String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
         if (names == null || names.length == 0) {
             try {
-                XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+                XConfiguration jobConf = getWorkflowConf(context);
                 names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
                 if (names == null || names.length == 0) {
                     names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
@@ -1745,4 +1734,12 @@ public class JavaActionExecutor extends ActionExecutor {
     public boolean supportsConfigurationJobXML() {
         return true;
     }
+
+    private XConfiguration getWorkflowConf(Context context) throws IOException {
+        if (workflowConf == null) {
+            workflowConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+        }
+        return workflowConf;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index f77e52c..1ea7097 100644
--- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -58,6 +58,8 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
     public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun";
 
     private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
+    public XLog LOG = XLog.getLog(getClass());
+
 
     static {
         String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
@@ -220,11 +222,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
                 JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
                                           subWorkflowConf);
 
-                // pushing the tag to conf for using by Launcher.
-                if(context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
-                    subWorkflowConf.set(ActionStartXCommand.OOZIE_ACTION_YARN_TAG,
-                            context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG));
-                }
+                subWorkflowConf.set(OOZIE_ACTION_YARN_TAG, getActionYarnTag(parentConf, context.getWorkflow(), action));
 
                 // if the rerun failed node option is provided during the time of rerun command, old subworkflow will
                 // rerun again.
@@ -247,6 +245,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
             }
         }
         catch (Exception ex) {
+            LOG.error(ex);
             throw convertException(ex);
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 8b0be9c..41f4430 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -67,11 +67,10 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
     public static final String COULD_NOT_START = "COULD_NOT_START";
     public static final String START_DATA_MISSING = "START_DATA_MISSING";
     public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
-    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
 
     private String jobId = null;
     protected String actionId = null;
-    private WorkflowJobBean wfJob = null;
+    protected WorkflowJobBean wfJob = null;
     protected WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
@@ -185,7 +184,7 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
                 isUserRetry = true;
                 prepareForRetry(wfAction);
             }
-            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+            context = getContext(isRetry, isUserRetry);
             boolean caught = false;
             try {
                 if (!(executor instanceof ControlNodeActionExecutor)) {
@@ -230,21 +229,6 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
                 Instrumentation.Cron cron = new Instrumentation.Cron();
                 cron.start();
                 context.setStartTime();
-                /*
-                Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
-                one child job is running. Tag is formed as follows:
-                For workflow job, tag = action-id
-                For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
-                coord-action-id@subflow-action-name@action-name.
-                 */
-                if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
-                    context.setVar(OOZIE_ACTION_YARN_TAG, conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName());
-                } else if (wfJob.getParentId() != null) {
-                    context.setVar(OOZIE_ACTION_YARN_TAG, wfJob.getParentId() + "@" + wfAction.getName());
-                } else {
-                    context.setVar(OOZIE_ACTION_YARN_TAG, wfAction.getId());
-                }
-
                 executor.start(context, wfAction);
                 cron.stop();
                 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
@@ -356,6 +340,16 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
         new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
     }
 
+    /**
+     * Get action executor context
+     * @param isRetry
+     * @param isUserRetry
+     * @return
+     */
+    protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry) {
+        return new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+    }
+
     protected void updateJobLastModified(){
         wfJob.setLastModifiedTime(new Date());
         updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index 525ef94..e65c3bf 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -23,6 +23,8 @@ import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
@@ -277,9 +279,9 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
      *
      */
     public static class ActionExecutorContext implements ActionExecutor.Context {
-        private final WorkflowJobBean workflow;
+        protected final WorkflowJobBean workflow;
         private Configuration protoConf;
-        private final WorkflowActionBean action;
+        protected final WorkflowActionBean action;
         private final boolean isRetry;
         private final boolean isUserRetry;
         private boolean started;
@@ -353,6 +355,15 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
          * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
          */
         public void setVar(String name, String value) {
+            setVarToWorkflow(name, value);
+        }
+
+        /**
+         * This is not thread safe, don't use if workflowjob is shared among multiple actions command
+         * @param name
+         * @param value
+         */
+        public void setVarToWorkflow(String name, String value) {
             name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
             WorkflowInstance wfInstance = workflow.getWorkflowInstance();
             wfInstance.setVar(name, value);
@@ -520,7 +531,32 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
         public void setJobStatus(Job.Status jobStatus) {
             this.jobStatus = jobStatus;
         }
+    }
+
+    public static class ForkedActionExecutorContext extends ActionExecutorContext {
+        private Map<String, String> contextVariableMap = new HashMap<String, String>();
+
+        public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry,
+                boolean isUserRetry) {
+            super(workflow, action, isRetry, isUserRetry);
+        }
+
+        public void setVar(String name, String value) {
+            if (value != null) {
+                contextVariableMap.remove(name);
+            }
+            else {
+                contextVariableMap.put(name, value);
+            }
+        }
 
+        public String getVar(String name) {
+            return contextVariableMap.get(name);
+        }
+
+        public Map<String, String> getContextMap() {
+            return contextVariableMap;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
index 47dca75..91da0b8 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -25,6 +25,7 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.XCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 
 public class ForkedActionStartXCommand extends ActionStartXCommand {
 
@@ -96,4 +97,10 @@ public class ForkedActionStartXCommand extends ActionStartXCommand {
     protected void callActionEnd() {
         queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
     }
+
+    @Override
+    protected ActionExecutorContext  getContext(boolean isRetry, boolean isUserRetry){
+        return  new ActionXCommand.ForkedActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index d2bb403..e95a60a 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -39,6 +39,7 @@ import org.apache.oozie.XException;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
+import org.apache.oozie.command.wf.ActionXCommand.ForkedActionExecutorContext;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -448,8 +449,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         }
         // Changing to synchronous call from asynchronous queuing to prevent
         // undue delay from between end of previous and start of next action
-        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING
-                && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
+        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
             // only for asynchronous actions, parent coord action's external id will
             // persisted and following update will succeed.
             updateParentIfNecessary(wfJob);
@@ -458,7 +458,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         else if (syncAction != null) {
             new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
         }
-        else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){
+        else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)) {
             startForkedActions(workflowActionBeanListForForked);
         }
         LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
@@ -467,9 +467,12 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
 
     public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
 
-        List<CallableWrapper<ActionExecutorContext>> tasks =
-                new ArrayList<CallableWrapper<ActionExecutorContext>>();
-        boolean updateLastModified = true;
+        List<CallableWrapper<ActionExecutorContext>> tasks = new ArrayList<CallableWrapper<ActionExecutorContext>>();
+        List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+
+        boolean endWorkflow = false;
+        boolean submitJobByQueuing = false;
         for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
             LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId());
             tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>(
@@ -477,18 +480,26 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         }
 
         try {
-            List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class).invokeAll(tasks);
+            List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class)
+                    .invokeAll(tasks);
             for (Future<ActionExecutorContext> result : futures) {
+                if (result == null) {
+                    submitJobByQueuing = true;
+                    continue;
+                }
                 ActionExecutorContext context = result.get();
+                Map<String, String> contextVariableMap = ((ForkedActionExecutorContext) context).getContextMap();
+                LOG.debug("contextVariableMap size of action " + context.getAction().getId() + " is " + contextVariableMap.size());
+                for (String key : contextVariableMap.keySet()) {
+                    context.setVarToWorkflow(key, contextVariableMap.get(key));
+                }
                 if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
                     LOG.warn("Action has failed, failing job" + context.getAction().getId());
                     new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
                     updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
                             (WorkflowActionBean) context.getAction()));
                     if (context.isShouldEndWF()) {
-                        endWF();
-                        updateLastModified = false;
-                        break;
+                        endWorkflow = true;
                     }
                 }
                 if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) {
@@ -498,20 +509,34 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                     updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
                             (WorkflowActionBean) context.getAction()));
                     if (context.isShouldEndWF()) {
-                        endWF();
-                        updateLastModified = false;
-                        break;
+                        endWorkflow = true;
                     }
                 }
             }
+            if (endWorkflow) {
+                endWF(insertList);
+            }
+
         }
         catch (Exception e) {
             LOG.error("Error running forked jobs parallely", e);
             startForkedActionsByQueuing(workflowActionBeanListForForked);
+            submitJobByQueuing = false;
+        }
+        if (submitJobByQueuing && !endWorkflow) {
+            LOG.error("There is error in running forked jobs parallely");
+            startForkedActionsByQueuing(workflowActionBeanListForForked);
+        }
+        wfJob.setLastModifiedTime(new Date());
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+                wfJob));
+        try {
+            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
         }
-        if (updateLastModified) {
-            updateJobLastModified();
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
         }
+
         LOG.debug("forked actions submitted parallely");
     }
 
@@ -519,17 +544,11 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         //queuing all jobs, submitted job will fail in precondition
         for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
             LOG.debug("Queuing fork action " + workflowActionBean.getId());
-            queue(new ActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()));
+            queue(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType()));
         }
     }
 
-    protected void updateJobLastModified() {
-        wfJob.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
-                wfJob));
-    }
-
-    protected void endWF() throws CommandException {
+    private void endWF(List<JsonBean> insertList) throws CommandException {
         updateParentIfNecessary(wfJob, 3);
         new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
         SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java b/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
index c1f7cb1..8da8f03 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.workflow.lite;
 
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.XLog;
 import org.apache.oozie.workflow.WorkflowException;
 
 import java.util.ArrayList;
@@ -31,6 +32,8 @@ import java.util.List;
 public abstract class ControlNodeHandler extends NodeHandler {
 
     public static final String FORK_COUNT_PREFIX = "workflow.fork.";
+    public XLog LOG = XLog.getLog(getClass());
+
 
     /**
      * Called by {@link #enter(Context)} when returning TRUE.
@@ -62,6 +65,7 @@ public abstract class ControlNodeHandler extends NodeHandler {
         else if (nodeClass.equals(JoinNodeDef.class)) {
             String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
             String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
+
             if (forkCount == null) {
                 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
             }
@@ -73,6 +77,8 @@ public abstract class ControlNodeHandler extends NodeHandler {
             else {
                 context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null);
             }
+            LOG.debug("count = " + count + " for parent execution path " + parentExecutionPath);
+
             doTouch = (count == 0);
         }
         else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 85bb993..5f9e29a 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -1256,6 +1256,12 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
                + "<value>java-job-conf</value>" + "</property>"
                + "</configuration>";
         wfBean.setConf(jobConf);
+        ae = new JavaActionExecutor() {
+            @Override
+            protected String getDefaultShareLibName(Element actionXml) {
+                return "java-action-executor";
+            }
+        };
         Assert.assertArrayEquals(new String[] { "java-job-conf" },
                 ae.getShareLibNames(context, new Element("java"), actionConf));
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8612854..ab44c24 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2436 Fork/join workflow fails with "oozie.action.yarn.tag must not be null" (puru)
 OOZIE-2578 Oozie example distcp job fails to run within an encrypted zone with checksum match error (pbacsko via rkanter)
 OOZIE-2362 SQL injection in BulkJPAExecutor (pbacsko via rkanter)
 OOZIE-2577 Flaky tests TestCoordActionInputCheckXCommand.testTimeout and testTimeoutWithException (pbacsko via rkanter)