You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/09/17 22:44:44 UTC

oozie git commit: OOZIE-2345 Parallel job submission for forked actions

Repository: oozie
Updated Branches:
  refs/heads/master e0ada0ab7 -> d8425480e


OOZIE-2345 Parallel job submission for forked actions


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d8425480
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d8425480
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d8425480

Branch: refs/heads/master
Commit: d8425480ea917fb39a2fb74b59d17da7cb85ca07
Parents: e0ada0a
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Thu Sep 17 13:46:50 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Thu Sep 17 13:46:50 2015 -0700

----------------------------------------------------------------------
 .../oozie/command/wf/ActionStartXCommand.java   |  60 ++---
 .../apache/oozie/command/wf/ActionXCommand.java |  24 +-
 .../command/wf/ForkedActionStartXCommand.java   |  99 ++++++++
 .../apache/oozie/command/wf/SignalXCommand.java | 118 ++++++++-
 .../oozie/service/CallableQueueService.java     |  27 +-
 .../apache/oozie/util/PriorityDelayQueue.java   |  19 +-
 core/src/main/resources/oozie-default.xml       |   9 +
 .../wf/TestForkedActionStartXCommand.java       | 244 +++++++++++++++++++
 .../oozie/command/wf/TestReRunXCommand.java     |  10 +-
 .../oozie/command/wf/TestSignalXCommand.java    |  35 ++-
 .../service/ExtendedCallableQueueService.java   |  37 +++
 .../oozie/util/TestPriorityDelayQueue.java      | 225 ++++++++++-------
 release-log.txt                                 |   1 +
 13 files changed, 761 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index e06649c..85a6cd7 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -21,7 +21,6 @@ package org.apache.oozie.command.wf;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-
 import javax.servlet.jsp.el.ELException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -62,7 +61,7 @@ import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.util.db.SLADbXOperations;
 
 @SuppressWarnings("deprecation")
-public class ActionStartXCommand extends ActionXCommand<Void> {
+public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
     public static final String EL_ERROR = "EL_ERROR";
     public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
     public static final String COULD_NOT_START = "COULD_NOT_START";
@@ -71,13 +70,14 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
     public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
 
     private String jobId = null;
-    private String actionId = null;
+    protected String actionId = null;
     private WorkflowJobBean wfJob = null;
-    private WorkflowActionBean wfAction = null;
+    protected WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
     private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
+    protected ActionExecutorContext context = null;
 
     public ActionStartXCommand(String actionId, String type) {
         super("action.start", type, 0);
@@ -157,8 +157,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
     }
 
     @Override
-    protected Void execute() throws CommandException {
-
+    protected ActionExecutorContext execute() throws CommandException {
         LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
         Configuration conf = wfJob.getWorkflowInstance().getConf();
 
@@ -174,7 +173,6 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
         executor.setMaxRetries(maxRetries);
         executor.setRetryInterval(retryInterval);
 
-        ActionExecutorContext context = null;
         try {
             boolean isRetry = false;
             if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
@@ -284,8 +282,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
                 LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
 
                 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
-                wfJob.setLastModifiedTime(new Date());
-                updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+                updateJobLastModified();
                 // Add SLA status event (STARTED) for WF_ACTION
                 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
                         SlaAppType.WORKFLOW_ACTION);
@@ -318,18 +315,12 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
                 case FAILED:
                     try {
                         failJob(context);
-                        updateParentIfNecessary(wfJob, 3);
-                        new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
+                        endWF();
                         SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
                                 SlaAppType.WORKFLOW_ACTION);
                         if(slaEvent1 != null) {
                             insertList.add(slaEvent1);
                         }
-                        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
-                                SlaAppType.WORKFLOW_JOB);
-                        if(slaEvent2 != null) {
-                            insertList.add(slaEvent2);
-                        }
                     }
                     catch (XException x) {
                         LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
@@ -337,8 +328,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
                     break;
             }
             updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
-            wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+            updateJobLastModified();
         }
         finally {
             try {
@@ -349,7 +339,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
                 if (execSynchronous) {
                     // Changing to synchronous call from asynchronous queuing to prevent
                     // undue delay from ::start:: to action due to queuing
-                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+                    callActionEnd();
                 }
             }
             catch (JPAExecutorException e) {
@@ -362,24 +352,36 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
         return null;
     }
 
-    private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
+    protected void callActionEnd() throws CommandException {
+        new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+    }
+
+    protected void updateJobLastModified(){
+        wfJob.setLastModifiedTime(new Date());
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+    }
+
+    protected void endWF() throws CommandException{
+        updateParentIfNecessary(wfJob, 3);
+        new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
+        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
+                SlaAppType.WORKFLOW_JOB);
+        if(slaEvent2 != null) {
+            insertList.add(slaEvent2);
+        }
+    }
+
+    protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
             throws CommandException {
         failJob(context);
         updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
-        wfJob.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+        updateJobLastModified();
         SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
                 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
         if(slaEvent1 != null) {
             insertList.add(slaEvent1);
         }
-        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
-                Status.FAILED, SlaAppType.WORKFLOW_JOB);
-        if(slaEvent2 != null) {
-            insertList.add(slaEvent2);
-        }
-
-        new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
+        endWF();
         return;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index 2616d32..b024bd0 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -34,6 +34,7 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.client.Job;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.command.CommandException;
@@ -56,7 +57,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
  * Base class for Action execution commands. Provides common functionality to handle different types of errors while
  * attempting to start or end an action.
  */
-public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
+public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
     private static final String INSTRUMENTATION_GROUP = "action.executors";
 
     protected static final String RECOVERY_ID_SEPARATOR = "@";
@@ -281,8 +282,10 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
         private boolean started;
         private boolean ended;
         private boolean executed;
+        private boolean shouldEndWF;
+        private Job.Status jobStatus;
 
-		/**
+        /**
 		 * Constructing the ActionExecutorContext, setting the private members
 		 * and constructing the proto configuration
 		 */
@@ -498,6 +501,23 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
         public void setErrorInfo(String str, String exMsg) {
             action.setErrorInfo(str, exMsg);
         }
+
+        public boolean isShouldEndWF() {
+            return shouldEndWF;
+        }
+
+        public void setShouldEndWF(boolean shouldEndWF) {
+            this.shouldEndWF = shouldEndWF;
+        }
+
+        public Job.Status getJobStatus() {
+            return jobStatus;
+        }
+
+        public void setJobStatus(Job.Status jobStatus) {
+            this.jobStatus = jobStatus;
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
new file mode 100644
index 0000000..47dca75
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.XCommand;
+
+public class ForkedActionStartXCommand extends ActionStartXCommand {
+
+    public ForkedActionStartXCommand(String actionId, String type) {
+        super(actionId, type);
+    }
+
+    public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) {
+        super(wfJob, id, type);
+    }
+
+    protected ActionExecutorContext execute() throws CommandException {
+        super.execute();
+        return context;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return actionId;
+    }
+
+    // In case of requeue follow the old approach.
+    @Override
+    protected void queue(XCommand<?> command, long msDelay) {
+
+        if (command instanceof ForkedActionStartXCommand) {
+            LOG.debug("Queueing ActionStartXCommand command");
+            super.queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), msDelay);
+        }
+        else {
+            LOG.debug("Queueing " + command);
+            super.queue(command, msDelay);
+        }
+    }
+
+    // Job will be failed by SignalXcommand, because ForkedActionStartXCommand doesn't have lock on jobId.
+    @Override
+    public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
+        this.context.setJobStatus(Job.Status.FAILED);
+    }
+
+    @Override
+    protected void updateParentIfNecessary(WorkflowJobBean wfjob, int maxRetries) throws CommandException {
+    }
+
+    @Override
+    protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
+            WorkflowAction.Status status) throws CommandException {
+        this.context.setJobStatus(Job.Status.SUSPENDED);
+    }
+
+    @Override
+    protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
+            throws CommandException {
+        this.context.setJobStatus(Job.Status.FAILED);
+    }
+
+    @Override
+    protected void updateJobLastModified() {
+    }
+
+    // Not killing job, setting flag so that signalX can kill the job
+    @Override
+    protected void endWF() {
+        context.setShouldEndWF(true);
+    }
+
+    @Override
+    protected void callActionEnd() {
+        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index d1fcd1a..6f64647 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -25,6 +25,7 @@ import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.control.ForkActionExecutor;
 import org.apache.oozie.action.control.StartActionExecutor;
 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
+import org.apache.oozie.client.Job;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
@@ -46,6 +47,9 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.CallableQueueService.CallableWrapper;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -70,6 +74,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.oozie.client.OozieClient;
 
@@ -86,6 +91,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
     private boolean generateEvent = false;
     private String wfJobErrorCode;
     private String wfJobErrorMsg;
+    public final static String FORK_PARALLEL_JOBSUBMISSION = "oozie.workflow.parallel.fork.action.start";
 
     public SignalXCommand(String name, int priority, String jobId) {
         super(name, name, priority);
@@ -164,6 +170,8 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         WorkflowJob.Status prevStatus = wfJob.getStatus();
         boolean completed = false, skipAction = false;
         WorkflowActionBean syncAction = null;
+        List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>();
+
 
         if (wfAction == null) {
             if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
@@ -387,15 +395,31 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
                         ActionExecutor current = as.getExecutor(wfAction.getType());
                         LOG.trace("Current Action Type:" + current.getClass());
                         if (!suspendNewAction) {
-                            if (!(current instanceof ForkActionExecutor) && !(current instanceof StartActionExecutor)) {
+                            if (current instanceof StartActionExecutor) {
                                 // Excluding :start: here from executing first action synchronously since it
                                 // blocks the consumer thread till the action is submitted to Hadoop,
                                 // in turn reducing the number of new submissions the threads can accept.
                                 // Would also be susceptible to longer delays in case Hadoop cluster is busy.
-                                syncAction = newAction;
+                                queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
+                            }
+                            else if (current instanceof ForkActionExecutor) {
+                                if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) {
+                                    workflowActionBeanListForForked.add(newAction);
+                                }
+                                else {
+                                    queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
+
+                                }
                             }
                             else {
-                                queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
+                                syncAction = newAction;
+                            }
+                        }
+                        else {
+                            // suspend check will happen later... where if one of action is suspended all forked action
+                            // will be ignored.
+                            if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) {
+                                workflowActionBeanListForForked.add(newAction);
                             }
                         }
                     }
@@ -434,10 +458,87 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         else if (syncAction != null) {
             new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey());
         }
+        else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){
+            startForkedActions(workflowActionBeanListForForked);
+        }
         LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
         return null;
     }
 
+    public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
+
+        List<CallableWrapper<ActionExecutorContext>> tasks =
+                new ArrayList<CallableWrapper<ActionExecutorContext>>();
+        boolean updateLastModified = true;
+        for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
+            LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId());
+            tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>(
+                    new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0));
+        }
+
+        try {
+            List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class).invokeAll(tasks);
+            for (Future<ActionExecutorContext> result : futures) {
+                ActionExecutorContext context = result.get();
+                if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
+                    LOG.warn("Action has failed, failing job" + context.getAction().getId());
+                    new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
+                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
+                            (WorkflowActionBean) context.getAction()));
+                    if (context.isShouldEndWF()) {
+                        endWF();
+                        updateLastModified = false;
+                        break;
+                    }
+                }
+                if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) {
+                    LOG.warn("Action has failed, failing job" + context.getAction().getId());
+                    new ActionStartXCommand(context.getAction().getId(), null).handleNonTransient(context, null,
+                            WorkflowAction.Status.START_MANUAL);
+                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
+                            (WorkflowActionBean) context.getAction()));
+                    if (context.isShouldEndWF()) {
+                        endWF();
+                        updateLastModified = false;
+                        break;
+                    }
+                }
+            }
+        }
+        catch (Exception e) {
+            LOG.error("Error running forked jobs parallely", e);
+            startForkedActionsByQueuing(workflowActionBeanListForForked);
+        }
+        if (updateLastModified) {
+            updateJobLastModified();
+        }
+        LOG.debug("forked actions submitted parallely");
+    }
+
+    public void startForkedActionsByQueuing(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
+        //queuing all jobs, submitted job will fail in precondition
+        for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
+            LOG.debug("Queuing fork action " + workflowActionBean.getId());
+            queue(new ActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()));
+        }
+    }
+
+    protected void updateJobLastModified() {
+        wfJob.setLastModifiedTime(new Date());
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+                wfJob));
+    }
+
+    protected void endWF() throws CommandException {
+        updateParentIfNecessary(wfJob, 3);
+        new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
+        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
+                SlaAppType.WORKFLOW_JOB);
+        if (slaEvent2 != null) {
+            insertList.add(slaEvent2);
+        }
+    }
+
     public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
         ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
         for (Map.Entry<String, String> entry : conf) {
@@ -538,4 +639,15 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
         return suspendNewAction;
     }
 
+
+
+private boolean checkForSuspendNode(List<WorkflowActionBean> workflowActionBeanListForForked) {
+    for(WorkflowActionBean bean :workflowActionBeanListForForked)
+        if(checkForSuspendNode(bean)){
+            return true;
+        }
+    return false;
+}
+
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index 830a58e..3333c77 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -29,14 +29,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
-import org.apache.oozie.command.XCommand;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.PollablePriorityDelayQueue;
@@ -144,10 +146,10 @@ public class CallableQueueService implements Service, Instrumentable {
     // and instrumentation.
     // The wrapper implements Runnable and Comparable to be able to work with an
     // executor and a priority queue.
-    class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
+    public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> {
         private Instrumentation.Cron cron;
 
-        public CallableWrapper(XCallable<?> callable, long delay) {
+        public CallableWrapper(XCallable<E> callable, long delay) {
             super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
             cron = new Instrumentation.Cron();
             cron.start();
@@ -172,7 +174,8 @@ public class CallableQueueService implements Service, Instrumentable {
                     log.trace("executing callable [{0}]", callable.getName());
 
                     try {
-                        callable.call();
+                        //FutureTask.run() will invoke cllable.call()
+                        super.run();
                         incrCounter(INSTR_EXECUTED_COUNTER, 1);
                         log.trace("executed callable [{0}]", callable.getName());
                     }
@@ -245,6 +248,13 @@ public class CallableQueueService implements Service, Instrumentable {
                 uniqueCallables.remove(callable.getKey());
             }
         }
+
+        //this will not get called, bcz  newTaskFor of threadpool will convert it in futureTask which is a runnable.
+        // futureTask  will call the cllable.call from run method. so we override run to call super.run method.
+        @Override
+        public E call() throws Exception {
+            return null;
+        }
     }
 
     class CompositeCallable implements XCallable<Void> {
@@ -498,6 +508,9 @@ public class CallableQueueService implements Service, Instrumentable {
                 super.beforeExecute(t,r);
                 XLog.Info.get().clear();
             }
+            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+                return (RunnableFuture<T>)callable;
+            }
         };
 
         for (int i = 0; i < threads; i++) {
@@ -770,4 +783,10 @@ public class CallableQueueService implements Service, Instrumentable {
         return list;
     }
 
+    // Refer executor.invokeAll
+    public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks)
+            throws InterruptedException {
+        return executor.invokeAll(tasks);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
index ae54506..1ce6fae 100644
--- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
+++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
@@ -59,8 +60,8 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
      * <p>
      * This wrapper keeps track of the priority and the age of a queue element.
      */
-    public static class QueueElement<E> implements Delayed {
-        private E element;
+    public static class QueueElement<E> extends FutureTask<E> implements Delayed {
+        private XCallable<E> element;
         private int priority;
         private long baseTime;
         boolean inQueue;
@@ -76,7 +77,8 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
          * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is
          * negative.
          */
-        public QueueElement(E element, int priority, long delay, TimeUnit unit) {
+        public QueueElement(XCallable<E> element, int priority, long delay, TimeUnit unit) {
+            super(element);
             if (element == null) {
                 throw new IllegalArgumentException("element cannot be null");
             }
@@ -92,20 +94,11 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
         }
 
         /**
-         * Create an Element wrapper with no delay and minimum priority.
-         *
-         * @param element element.
-         */
-        public QueueElement(E element) {
-            this(element, 0, 0, TimeUnit.MILLISECONDS);
-        }
-
-        /**
          * Return the element from the wrapper.
          *
          * @return the element.
          */
-        public E getElement() {
+        public XCallable<E> getElement() {
             return element;
         }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 4b9a0bc..400569b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2127,6 +2127,15 @@
 		</description>
 	</property>
 
+    <property>
+        <name>oozie.workflow.parallel.fork.action.start</name>
+        <value>true</value>
+        <description>
+            Determines how Oozie processes starting of forked actions. If true, forked actions and their job submissions
+            are done in parallel which is best for performance. If false, they are submitted sequentially.
+        </description>
+    </property>
+
 	<property>
 		<name>oozie.coord.action.get.all.attributes</name>
 		<value>false</value>

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
new file mode 100644
index 0000000..e685621
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ForTestingActionExecutor;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.ExtendedCallableQueueService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestForkedActionStartXCommand extends XDataTestCase {
+
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
+        setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
+        setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName());
+        services = new Services();
+        services.init();
+        services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testWfSuccess() throws Exception {
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "</fork>"
+                + "<action name=\"action1\">"
+                + "<fs></fs>"
+                + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action><action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message>"
+                + "</kill><"
+                + "end name=\"end\"/>"
+                + "</workflow-app>";
+           //@Formatter:on
+
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+        waitFor(20 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus()
+                        == WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+        assertEquals(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus(),
+                WorkflowJob.Status.SUCCEEDED);
+    }
+
+    public void testWfFailure() throws Exception {
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "</fork>"
+                + "<action name=\"action1\">"
+                + "<fs></fs>"
+                + "<ok to=\"kill\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action><action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message>"
+                + "</kill><"
+                + "end name=\"end\"/>"
+                + "</workflow-app>";
+           //@Formatter:on
+
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+        waitFor(200 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus()
+                        == WorkflowJob.Status.KILLED;
+            }
+        });
+        assertEquals(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus(),
+                WorkflowJob.Status.KILLED);
+    }
+
+    public void testUserRetry() throws JPAExecutorException, IOException, CommandException{
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "</fork>"
+                +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
+                + "<test xmlns=\"uri:test\">"
+                +    "<signal-value>${wf:conf('signal-value')}</signal-value>"
+                +    "<external-status>${wf:conf('external-status')}</external-status> "
+                +    "<error>${wf:conf('error')}</error>"
+                +    "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+                +    "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+                +    "<running-mode>${wf:conf('running-mode')}</running-mode>"
+                + "</test>"
+                + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message></kill>"
+                + "<end name=\"end\"/>"
+                + "</workflow-app>";
+           //@Formatter:on
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("error", "start.error");
+        conf.set("external-status",  "error");
+        conf.set("signal-value", "based_on_action_status");
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+        final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+
+        waitFor(20 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+                WorkflowActionBean action = null;
+                for (WorkflowActionBean bean : actions) {
+                    if (bean.getType().equals("test")) {
+                        action = bean;
+                        break;
+                    }
+                }
+                return (action != null && action.getUserRetryCount() == 2);
+            }
+        });
+
+        List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+        WorkflowActionBean action = null;
+        for (WorkflowActionBean bean : actions) {
+            if (bean.getType().equals("test")) {
+                action = bean;
+                break;
+            }
+        }
+        assertNotNull(action);
+        assertEquals(2, action.getUserRetryCount());
+    }
+
+    private void writeToFile(String appXml, String appPath) throws IOException {
+        File wf = new File(URI.create(appPath));
+        PrintWriter out = null;
+        try {
+            out = new PrintWriter(new FileWriter(wf));
+            out.println(appXml);
+        }
+        catch (IOException iex) {
+            throw iex;
+        }
+        finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
index 02f6166..45cbbc4 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.Writer;
 import java.util.List;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.client.CoordinatorAction;
@@ -37,6 +38,7 @@ import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
@@ -123,7 +125,13 @@ public class TestReRunXCommand extends XDataTestCase {
      *
      * @throws Exception
      */
-    public void testRerunFork() throws Exception {
+    public void testRerunFork() throws Exception{
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+        _testRerunFork();
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+        _testRerunFork();
+    }
+    public void _testRerunFork() throws Exception {
         // We need the shell schema and action for this test
         Services.get().setService(ActionService.class);
         Services.get().getConf().set(SchemaService.WF_CONF_EXT_SCHEMAS, "shell-action-0.3.xsd");

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index 4268b30..810bc1e 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -38,6 +38,7 @@ import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
@@ -51,8 +52,8 @@ public class TestSignalXCommand extends XDataTestCase {
     protected void setUp() throws Exception {
         super.setUp();
         services = new Services();
-        services.getConf().setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false);
         services.init();
+        ConfigurationService.setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false);
 
     }
 
@@ -61,8 +62,34 @@ public class TestSignalXCommand extends XDataTestCase {
         services.destroy();
         super.tearDown();
     }
+    public void testJoinFail() throws Exception{
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+        _testJoinFail();
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+        _testJoinFail();
+    }
+
+    public void testSuspendPoints() throws Exception{
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+        _testSuspendPoints();
+        services.destroy();
+        services = new Services();
+        services.init();
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+        _testSuspendPoints();
+    }
+
+    public void testSuspendPointsAll() throws Exception{
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+        _testSuspendPointsAll();
+        services.destroy();
+        services = new Services();
+        services.init();
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+        _testSuspendPointsAll();
+    }
 
-    public void testJoinFail() throws Exception {
+    public void _testJoinFail() throws Exception {
         Logger logger = Logger.getLogger(SignalXCommand.class);
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         Layout layout = new SimpleLayout();
@@ -94,7 +121,7 @@ public class TestSignalXCommand extends XDataTestCase {
         assertFalse(out.toString().contains("EntityExistsException"));
     }
 
-    public void testSuspendPoints() throws Exception {
+    public void _testSuspendPoints() throws Exception {
         services.destroy();
         LocalOozie.start();
         FileSystem fs = getFileSystem();
@@ -168,7 +195,7 @@ public class TestSignalXCommand extends XDataTestCase {
         LocalOozie.stop();
     }
 
-    public void testSuspendPointsAll() throws Exception {
+    public void _testSuspendPointsAll() throws Exception {
         services.destroy();
         LocalOozie.start();
         FileSystem fs = getFileSystem();

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java
new file mode 100644
index 0000000..4fdeee5
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.oozie.service.CallableQueueService;
+
+public class ExtendedCallableQueueService extends CallableQueueService {
+    @Override
+    public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks) throws InterruptedException {
+        try {
+            return super.invokeAll(tasks);
+        }
+        catch (Throwable e) {
+            throw new Error();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java b/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
index 857f4e3..b48e0cc 100644
--- a/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
+++ b/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
@@ -22,7 +22,6 @@ import junit.framework.TestCase;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,47 +47,47 @@ public class TestPriorityDelayQueue extends TestCase {
         Object obj = new Object();
 
         try {
-            new PriorityDelayQueue.QueueElement<Object>(null);
+            new TestQueueElement<Object>(null);
             fail();
         }
         catch (IllegalArgumentException ex) {
         }
 
         try {
-            new PriorityDelayQueue.QueueElement<Object>(null, 0, 0, TimeUnit.MILLISECONDS);
+            new TestQueueElement<Object>(null, 0, 0, TimeUnit.MILLISECONDS);
             fail();
         }
         catch (IllegalArgumentException ex) {
         }
 
         try {
-            new PriorityDelayQueue.QueueElement<Object>(obj, -1, 0, TimeUnit.MILLISECONDS);
+            new TestQueueElement<Object>(obj, -1, 0, TimeUnit.MILLISECONDS);
             fail();
         }
         catch (IllegalArgumentException ex) {
         }
 
         try {
-            new PriorityDelayQueue.QueueElement<Object>(obj, 0, -1, TimeUnit.MILLISECONDS);
+            new TestQueueElement<Object>(obj, 0, -1, TimeUnit.MILLISECONDS);
             fail();
         }
         catch (IllegalArgumentException ex) {
         }
 
-        PriorityDelayQueue.QueueElement<Object> e1 = new PriorityDelayQueue.QueueElement<Object>(obj);
-        assertEquals(obj, e1.getElement());
+        TestQueueElement<Object> e1 = new TestQueueElement<Object>(obj);
+        assertEquals(obj, e1.getElement().call());
         assertEquals(0, e1.getPriority());
         assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 0);
 
-        e1 = new PriorityDelayQueue.QueueElement<Object>(obj, 1, 200, TimeUnit.MILLISECONDS);
-        assertEquals(obj, e1.getElement());
+        e1 = new TestQueueElement<Object>(obj, 1, 200, TimeUnit.MILLISECONDS);
+        assertEquals(obj, e1.getElement().call());
         assertEquals(1, e1.getPriority());
         assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 200);
         assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) >= 100);
         Thread.sleep(300);
         assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 0);
 
-        PriorityDelayQueue.QueueElement<Object> e2 = new PriorityDelayQueue.QueueElement<Object>(obj);
+        TestQueueElement<Object> e2 = new TestQueueElement<Object>(obj);
 
         assertTrue(e1.compareTo(e2) < 0);
     }
@@ -129,23 +128,23 @@ public class TestPriorityDelayQueue extends TestCase {
         assertEquals(-1, q.getMaxSize());
         assertEquals(1000, q.getMaxWait(TimeUnit.MILLISECONDS));
         assertEquals(0, q.size());
-        assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+        assertTrue(q.offer(new TestQueueElement<Integer>(1)));
         assertEquals(1, q.size());
-        assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+        assertTrue(q.offer(new TestQueueElement<Integer>(1)));
         assertEquals(2, q.size());
-        assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+        assertTrue(q.offer(new TestQueueElement<Integer>(1)));
         assertEquals(3, q.size());
 
         q = new PriorityDelayQueue<Integer>(1, 1000, TimeUnit.MILLISECONDS, 1);
         assertEquals(1, q.getMaxSize());
         assertEquals(0, q.size());
-        assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+        assertTrue(q.offer(new TestQueueElement<Integer>(1)));
         assertEquals(1, q.size());
-        assertFalse(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+        assertFalse(q.offer(new TestQueueElement<Integer>(1)));
         assertEquals(1, q.size());
         assertNotNull(q.poll());
         assertEquals(0, q.size());
-        assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+        assertTrue(q.offer(new TestQueueElement<Integer>(1)));
         assertEquals(1, q.size());
     }
 
@@ -154,88 +153,88 @@ public class TestPriorityDelayQueue extends TestCase {
 
         //test immediate offer polling
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(1));
-        assertEquals((Integer) 1, q.poll().getElement());
+        q.offer(new TestQueueElement<Integer>(1));
+        assertEquals((Integer) 1, q.poll().getElement().call());
         assertEquals(0, q.size());
 
         //test delayed offer polling
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(2, 0, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(2, 0, 10, TimeUnit.MILLISECONDS));
         assertNull(q.poll());
 
         Thread.sleep(11);
 
-        assertEquals((Integer) 2, q.poll().getElement());
+        assertEquals((Integer) 2, q.poll().getElement().call());
         assertEquals(0, q.size());
 
         //test different priorities immediate offer polling
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
 
-        assertEquals((Integer) 30, q.poll().getElement());
-        assertEquals((Integer) 20, q.poll().getElement());
-        assertEquals((Integer) 10, q.poll().getElement());
+        assertEquals((Integer) 30, q.poll().getElement().call());
+        assertEquals((Integer) 20, q.poll().getElement().call());
+        assertEquals((Integer) 10, q.poll().getElement().call());
         assertEquals(0, q.size());
 
         //test different priorities equal delayed offer polling
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 10, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 10, TimeUnit.MILLISECONDS));
 
         Thread.sleep(11);
 
-        List<Integer> list = new ArrayList<Integer>();
+        List<XCallable> list = new ArrayList<XCallable>();
         while (list.size() != 3) {
             QueueElement<Integer> e = q.poll();
             if (e != null) {
                 list.add(e.getElement());
             }
         }
-        assertEquals((Integer) 30, list.get(0));
-        assertEquals((Integer) 20, list.get(1));
-        assertEquals((Integer) 10, list.get(2));
+        assertEquals((Integer) 30, list.get(0).call());
+        assertEquals((Integer) 20, list.get(1).call());
+        assertEquals((Integer) 10, list.get(2).call());
         assertEquals(0, q.size());
 
         //test different priorities different delayed offer polling after delay
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 20, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 20, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
 
         Thread.sleep(21);
 
-        list = new ArrayList<Integer>();
+        list = new ArrayList<XCallable>();
         while (list.size() != 3) {
             QueueElement<Integer> e = q.poll();
             if (e != null) {
                 list.add(e.getElement());
             }
         }
-        assertEquals((Integer) 30, list.get(0));
-        assertEquals((Integer) 20, list.get(1));
-        assertEquals((Integer) 10, list.get(2));
+        assertEquals((Integer) 30, list.get(0).call());
+        assertEquals((Integer) 20, list.get(1).call());
+        assertEquals((Integer) 10, list.get(2).call());
         assertEquals(0, q.size());
 
         //test different priorities different delayed offer polling within delay
 
         long start = System.currentTimeMillis();
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
 
-        assertEquals((Integer) 20, q.poll().getElement());
+        assertEquals((Integer) 20, q.poll().getElement().call());
         long delay = System.currentTimeMillis() - start;
         Thread.sleep(101 - delay);
-        assertEquals((Integer) 10, q.poll().getElement());
+        assertEquals((Integer) 10, q.poll().getElement().call());
 
         start = System.currentTimeMillis();
         delay = System.currentTimeMillis() - start;
 
         Thread.sleep(101 - delay);
-        assertEquals((Integer) 30, q.poll().getElement());
+        assertEquals((Integer) 30, q.poll().getElement().call());
 
         assertEquals(0, q.size());
     }
@@ -245,46 +244,46 @@ public class TestPriorityDelayQueue extends TestCase {
 
         //test immediate offer peeking
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(1));
-        assertEquals((Integer) 1, q.peek().getElement());
+        q.offer(new TestQueueElement<Integer>(1));
+        assertEquals((Integer) 1, q.peek().getElement().call());
         q.poll();
         assertEquals(0, q.size());
 
         //test delay offer peeking
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
-        assertEquals((Integer) 1, q.peek().getElement());
+        q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
+        assertEquals((Integer) 1, q.peek().getElement().call());
         Thread.sleep(11);
         assertNotNull(q.poll());
         assertEquals(0, q.size());
 
         //test different priorities immediate offer peeking
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
 
-        assertEquals((Integer) 30, q.peek().getElement());
+        assertEquals((Integer) 30, q.peek().getElement().call());
         assertNotNull(q.poll());
-        assertEquals((Integer) 20, q.peek().getElement());
+        assertEquals((Integer) 20, q.peek().getElement().call());
         assertNotNull(q.poll());
-        assertEquals((Integer) 10, q.peek().getElement());
+        assertEquals((Integer) 10, q.peek().getElement().call());
         assertNotNull(q.poll());
         assertEquals(0, q.size());
 
         //test different priorities delayed offer peeking
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 150, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 150, TimeUnit.MILLISECONDS));
 
-        assertEquals((Integer) 10, q.peek().getElement());
+        assertEquals((Integer) 10, q.peek().getElement().call());
         Thread.sleep(100);
         assertNotNull(q.poll());
-        assertEquals((Integer) 20, q.peek().getElement());
+        assertEquals((Integer) 20, q.peek().getElement().call());
         Thread.sleep(50);
         assertNotNull(q.poll());
-        assertEquals((Integer) 30, q.peek().getElement());
+        assertEquals((Integer) 30, q.peek().getElement().call());
         Thread.sleep(50);
         assertNotNull(q.poll());
         assertEquals(0, q.size());
@@ -292,7 +291,7 @@ public class TestPriorityDelayQueue extends TestCase {
 
     public void testAntiStarvation() throws Exception {
         PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1);
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(1));
+        q.offer(new TestQueueElement<Integer>(1));
         q.peek();
         assertEquals(1, q.sizes()[0]);
         Thread.sleep(600);
@@ -317,7 +316,7 @@ public class TestPriorityDelayQueue extends TestCase {
                     for (int j = 0; j < 10; j++) {
                         String msg = count + " - " + j;
                         try {
-                            queue.offer(new PriorityDelayQueue.QueueElement<String>(msg,
+                            queue.offer(new TestQueueElement<String>(msg,
                                         (int) (Math.random() * priorities),
                                         (int) (Math.random() * 500), TimeUnit.MILLISECONDS));
                             Thread.sleep((int) (Math.random() * 50));
@@ -346,48 +345,92 @@ public class TestPriorityDelayQueue extends TestCase {
     public void testIterator() throws Exception {
         PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1);
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
 
-        Iterator<PriorityDelayQueue.QueueElement<Integer>> it = q.iterator();
-        assertTrue(it.hasNext());
-
-        int size = 0;
-        while (it.hasNext()) {
-            it.next();
-            size++;
-        }
-        assertEquals(4, size);
+        assertEquals(4, q.size());
 
         assertNotNull(q.poll());
         assertNotNull(q.poll());
 
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(40, 0, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(50, 2, 0, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(60, 1, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(40, 0, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(50, 2, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(60, 1, 0, TimeUnit.MILLISECONDS));
 
         assertEquals(5, q.size());
         assertNotNull(q.poll());
-
-        it = q.iterator();
-        Thread.sleep(50);
-        size = 0;
-        while (it.hasNext()) {
-            it.next();
-            size++;
-        }
-        assertEquals(4, size);
+        assertEquals(4, q.size());
     }
 
     public void testClear() {
         PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1);
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
-        q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
+        q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
         assertEquals(2, q.size());
         q.clear();
         assertEquals(0, q.size());
     }
 
+    public static class TestQueueElement<E> extends QueueElement<E> {
+
+        public TestQueueElement(final E element, int priority, long delay, TimeUnit unit) {
+            super(new XCallable<E>() {
+
+                @Override
+                public E call() throws Exception {
+                    return element;
+                }
+
+                @Override
+                public String getName() {
+                    return null;
+                }
+
+                @Override
+                public int getPriority() {
+                    return 0;
+                }
+
+                @Override
+                public String getType() {
+                    return null;
+                }
+
+                @Override
+                public long getCreatedTime() {
+                    return 0;
+                }
+
+                @Override
+                public String getKey() {
+                    return null;
+                }
+
+                @Override
+                public String getEntityKey() {
+                    return null;
+                }
+
+                @Override
+                public void setInterruptMode(boolean mode) {
+                }
+
+                @Override
+                public boolean inInterruptMode() {
+                    return false;
+                }
+            }, priority, delay, unit);
+            ParamChecker.notNull(element, "element can't be null");
+        }
+
+        public TestQueueElement(E element) {
+            this(element, 0, 0, TimeUnit.MILLISECONDS);
+        }
+
+        protected void debug(String template, Object... args) {
+            System.out.println(MessageFormat.format(template, args));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d533d78..160cd72 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2345 Parallel job submission for forked actions (puru)
 OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini)
 OOZIE-2356 Add a way to enable/disable credentials in a workflow (rkanter)
 OOZIE-2355 Hive2 Action doesn't pass along oozie configs to jobconf (rkanter)