You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/09/17 22:44:44 UTC
oozie git commit: OOZIE-2345 Parallel job submission for forked
actions
Repository: oozie
Updated Branches:
refs/heads/master e0ada0ab7 -> d8425480e
OOZIE-2345 Parallel job submission for forked actions
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d8425480
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d8425480
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d8425480
Branch: refs/heads/master
Commit: d8425480ea917fb39a2fb74b59d17da7cb85ca07
Parents: e0ada0a
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Thu Sep 17 13:46:50 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Thu Sep 17 13:46:50 2015 -0700
----------------------------------------------------------------------
.../oozie/command/wf/ActionStartXCommand.java | 60 ++---
.../apache/oozie/command/wf/ActionXCommand.java | 24 +-
.../command/wf/ForkedActionStartXCommand.java | 99 ++++++++
.../apache/oozie/command/wf/SignalXCommand.java | 118 ++++++++-
.../oozie/service/CallableQueueService.java | 27 +-
.../apache/oozie/util/PriorityDelayQueue.java | 19 +-
core/src/main/resources/oozie-default.xml | 9 +
.../wf/TestForkedActionStartXCommand.java | 244 +++++++++++++++++++
.../oozie/command/wf/TestReRunXCommand.java | 10 +-
.../oozie/command/wf/TestSignalXCommand.java | 35 ++-
.../service/ExtendedCallableQueueService.java | 37 +++
.../oozie/util/TestPriorityDelayQueue.java | 225 ++++++++++-------
release-log.txt | 1 +
13 files changed, 761 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index e06649c..85a6cd7 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -21,7 +21,6 @@ package org.apache.oozie.command.wf;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-
import javax.servlet.jsp.el.ELException;
import org.apache.hadoop.conf.Configuration;
@@ -62,7 +61,7 @@ import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.db.SLADbXOperations;
@SuppressWarnings("deprecation")
-public class ActionStartXCommand extends ActionXCommand<Void> {
+public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
public static final String EL_ERROR = "EL_ERROR";
public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
public static final String COULD_NOT_START = "COULD_NOT_START";
@@ -71,13 +70,14 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
private String jobId = null;
- private String actionId = null;
+ protected String actionId = null;
private WorkflowJobBean wfJob = null;
- private WorkflowActionBean wfAction = null;
+ protected WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
+ protected ActionExecutorContext context = null;
public ActionStartXCommand(String actionId, String type) {
super("action.start", type, 0);
@@ -157,8 +157,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
}
@Override
- protected Void execute() throws CommandException {
-
+ protected ActionExecutorContext execute() throws CommandException {
LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
Configuration conf = wfJob.getWorkflowInstance().getConf();
@@ -174,7 +173,6 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
executor.setMaxRetries(maxRetries);
executor.setRetryInterval(retryInterval);
- ActionExecutorContext context = null;
try {
boolean isRetry = false;
if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
@@ -284,8 +282,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
- wfJob.setLastModifiedTime(new Date());
- updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+ updateJobLastModified();
// Add SLA status event (STARTED) for WF_ACTION
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
SlaAppType.WORKFLOW_ACTION);
@@ -318,18 +315,12 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
case FAILED:
try {
failJob(context);
- updateParentIfNecessary(wfJob, 3);
- new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
+ endWF();
SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
SlaAppType.WORKFLOW_ACTION);
if(slaEvent1 != null) {
insertList.add(slaEvent1);
}
- SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
- SlaAppType.WORKFLOW_JOB);
- if(slaEvent2 != null) {
- insertList.add(slaEvent2);
- }
}
catch (XException x) {
LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
@@ -337,8 +328,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
break;
}
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
- wfJob.setLastModifiedTime(new Date());
- updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+ updateJobLastModified();
}
finally {
try {
@@ -349,7 +339,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
if (execSynchronous) {
// Changing to synchronous call from asynchronous queuing to prevent
// undue delay from ::start:: to action due to queuing
- new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+ callActionEnd();
}
}
catch (JPAExecutorException e) {
@@ -362,24 +352,36 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
return null;
}
- private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
+ protected void callActionEnd() throws CommandException {
+ new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
+ }
+
+ protected void updateJobLastModified(){
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+ }
+
+ protected void endWF() throws CommandException{
+ updateParentIfNecessary(wfJob, 3);
+ new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
+ SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
+ SlaAppType.WORKFLOW_JOB);
+ if(slaEvent2 != null) {
+ insertList.add(slaEvent2);
+ }
+ }
+
+ protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
throws CommandException {
failJob(context);
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
- wfJob.setLastModifiedTime(new Date());
- updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+ updateJobLastModified();
SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
Status.FAILED, SlaAppType.WORKFLOW_ACTION);
if(slaEvent1 != null) {
insertList.add(slaEvent1);
}
- SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
- Status.FAILED, SlaAppType.WORKFLOW_JOB);
- if(slaEvent2 != null) {
- insertList.add(slaEvent2);
- }
-
- new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
+ endWF();
return;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index 2616d32..b024bd0 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -34,6 +34,7 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.client.Job;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.CommandException;
@@ -56,7 +57,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
* Base class for Action execution commands. Provides common functionality to handle different types of errors while
* attempting to start or end an action.
*/
-public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
+public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
private static final String INSTRUMENTATION_GROUP = "action.executors";
protected static final String RECOVERY_ID_SEPARATOR = "@";
@@ -281,8 +282,10 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
private boolean started;
private boolean ended;
private boolean executed;
+ private boolean shouldEndWF;
+ private Job.Status jobStatus;
- /**
+ /**
* Constructing the ActionExecutorContext, setting the private members
* and constructing the proto configuration
*/
@@ -498,6 +501,23 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
public void setErrorInfo(String str, String exMsg) {
action.setErrorInfo(str, exMsg);
}
+
+ public boolean isShouldEndWF() {
+ return shouldEndWF;
+ }
+
+ public void setShouldEndWF(boolean shouldEndWF) {
+ this.shouldEndWF = shouldEndWF;
+ }
+
+ public Job.Status getJobStatus() {
+ return jobStatus;
+ }
+
+ public void setJobStatus(Job.Status jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
new file mode 100644
index 0000000..47dca75
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.XCommand;
+
+public class ForkedActionStartXCommand extends ActionStartXCommand {
+
+ public ForkedActionStartXCommand(String actionId, String type) {
+ super(actionId, type);
+ }
+
+ public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) {
+ super(wfJob, id, type);
+ }
+
+ protected ActionExecutorContext execute() throws CommandException {
+ super.execute();
+ return context;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return actionId;
+ }
+
+ // In case of requeue follow the old approach.
+ @Override
+ protected void queue(XCommand<?> command, long msDelay) {
+
+ if (command instanceof ForkedActionStartXCommand) {
+ LOG.debug("Queueing ActionStartXCommand command");
+ super.queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), msDelay);
+ }
+ else {
+ LOG.debug("Queueing " + command);
+ super.queue(command, msDelay);
+ }
+ }
+
+ // Job will be failed by SignalXcommand, because ForkedActionStartXCommand doesn't have lock on jobId.
+ @Override
+ public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
+ this.context.setJobStatus(Job.Status.FAILED);
+ }
+
+ @Override
+ protected void updateParentIfNecessary(WorkflowJobBean wfjob, int maxRetries) throws CommandException {
+ }
+
+ @Override
+ protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
+ WorkflowAction.Status status) throws CommandException {
+ this.context.setJobStatus(Job.Status.SUSPENDED);
+ }
+
+ @Override
+ protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
+ throws CommandException {
+ this.context.setJobStatus(Job.Status.FAILED);
+ }
+
+ @Override
+ protected void updateJobLastModified() {
+ }
+
+ // Not killing job, setting flag so that signalX can kill the job
+ @Override
+ protected void endWF() {
+ context.setShouldEndWF(true);
+ }
+
+ @Override
+ protected void callActionEnd() {
+ queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index d1fcd1a..6f64647 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -25,6 +25,7 @@ import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.control.ForkActionExecutor;
import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
+import org.apache.oozie.client.Job;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
@@ -46,6 +47,9 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.CallableQueueService.CallableWrapper;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -70,6 +74,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import org.apache.oozie.client.OozieClient;
@@ -86,6 +91,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
private boolean generateEvent = false;
private String wfJobErrorCode;
private String wfJobErrorMsg;
+ public final static String FORK_PARALLEL_JOBSUBMISSION = "oozie.workflow.parallel.fork.action.start";
public SignalXCommand(String name, int priority, String jobId) {
super(name, name, priority);
@@ -164,6 +170,8 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
WorkflowJob.Status prevStatus = wfJob.getStatus();
boolean completed = false, skipAction = false;
WorkflowActionBean syncAction = null;
+ List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>();
+
if (wfAction == null) {
if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
@@ -387,15 +395,31 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
ActionExecutor current = as.getExecutor(wfAction.getType());
LOG.trace("Current Action Type:" + current.getClass());
if (!suspendNewAction) {
- if (!(current instanceof ForkActionExecutor) && !(current instanceof StartActionExecutor)) {
+ if (current instanceof StartActionExecutor) {
// Excluding :start: here from executing first action synchronously since it
// blocks the consumer thread till the action is submitted to Hadoop,
// in turn reducing the number of new submissions the threads can accept.
// Would also be susceptible to longer delays in case Hadoop cluster is busy.
- syncAction = newAction;
+ queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
+ }
+ else if (current instanceof ForkActionExecutor) {
+ if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) {
+ workflowActionBeanListForForked.add(newAction);
+ }
+ else {
+ queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
+
+ }
}
else {
- queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
+ syncAction = newAction;
+ }
+ }
+ else {
+ // suspend check will happen later... where if one of action is suspended all forked action
+ // will be ignored.
+ if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) {
+ workflowActionBeanListForForked.add(newAction);
}
}
}
@@ -434,10 +458,87 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
else if (syncAction != null) {
new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey());
}
+ else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){
+ startForkedActions(workflowActionBeanListForForked);
+ }
LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
return null;
}
+ public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
+
+ List<CallableWrapper<ActionExecutorContext>> tasks =
+ new ArrayList<CallableWrapper<ActionExecutorContext>>();
+ boolean updateLastModified = true;
+ for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
+ LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId());
+ tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>(
+ new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0));
+ }
+
+ try {
+ List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class).invokeAll(tasks);
+ for (Future<ActionExecutorContext> result : futures) {
+ ActionExecutorContext context = result.get();
+ if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
+ LOG.warn("Action has failed, failing job" + context.getAction().getId());
+ new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
+ (WorkflowActionBean) context.getAction()));
+ if (context.isShouldEndWF()) {
+ endWF();
+ updateLastModified = false;
+ break;
+ }
+ }
+ if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) {
+ LOG.warn("Action has failed, failing job" + context.getAction().getId());
+ new ActionStartXCommand(context.getAction().getId(), null).handleNonTransient(context, null,
+ WorkflowAction.Status.START_MANUAL);
+ updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
+ (WorkflowActionBean) context.getAction()));
+ if (context.isShouldEndWF()) {
+ endWF();
+ updateLastModified = false;
+ break;
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Error running forked jobs parallely", e);
+ startForkedActionsByQueuing(workflowActionBeanListForForked);
+ }
+ if (updateLastModified) {
+ updateJobLastModified();
+ }
+ LOG.debug("forked actions submitted parallely");
+ }
+
+ public void startForkedActionsByQueuing(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
+ //queuing all jobs, submitted job will fail in precondition
+ for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
+ LOG.debug("Queuing fork action " + workflowActionBean.getId());
+ queue(new ActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()));
+ }
+ }
+
+ protected void updateJobLastModified() {
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+ wfJob));
+ }
+
+ protected void endWF() throws CommandException {
+ updateParentIfNecessary(wfJob, 3);
+ new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
+ SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
+ SlaAppType.WORKFLOW_JOB);
+ if (slaEvent2 != null) {
+ insertList.add(slaEvent2);
+ }
+ }
+
public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
for (Map.Entry<String, String> entry : conf) {
@@ -538,4 +639,15 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
return suspendNewAction;
}
+
+
+private boolean checkForSuspendNode(List<WorkflowActionBean> workflowActionBeanListForForked) {
+ for(WorkflowActionBean bean :workflowActionBeanListForForked)
+ if(checkForSuspendNode(bean)){
+ return true;
+ }
+ return false;
+}
+
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index 830a58e..3333c77 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -29,14 +29,16 @@ import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
-import org.apache.oozie.command.XCommand;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.PollablePriorityDelayQueue;
@@ -144,10 +146,10 @@ public class CallableQueueService implements Service, Instrumentable {
// and instrumentation.
// The wrapper implements Runnable and Comparable to be able to work with an
// executor and a priority queue.
- class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
+ public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> {
private Instrumentation.Cron cron;
- public CallableWrapper(XCallable<?> callable, long delay) {
+ public CallableWrapper(XCallable<E> callable, long delay) {
super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
cron = new Instrumentation.Cron();
cron.start();
@@ -172,7 +174,8 @@ public class CallableQueueService implements Service, Instrumentable {
log.trace("executing callable [{0}]", callable.getName());
try {
- callable.call();
+ //FutureTask.run() will invoke cllable.call()
+ super.run();
incrCounter(INSTR_EXECUTED_COUNTER, 1);
log.trace("executed callable [{0}]", callable.getName());
}
@@ -245,6 +248,13 @@ public class CallableQueueService implements Service, Instrumentable {
uniqueCallables.remove(callable.getKey());
}
}
+
+ //this will not get called, bcz newTaskFor of threadpool will convert it in futureTask which is a runnable.
+ // futureTask will call the cllable.call from run method. so we override run to call super.run method.
+ @Override
+ public E call() throws Exception {
+ return null;
+ }
}
class CompositeCallable implements XCallable<Void> {
@@ -498,6 +508,9 @@ public class CallableQueueService implements Service, Instrumentable {
super.beforeExecute(t,r);
XLog.Info.get().clear();
}
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return (RunnableFuture<T>)callable;
+ }
};
for (int i = 0; i < threads; i++) {
@@ -770,4 +783,10 @@ public class CallableQueueService implements Service, Instrumentable {
return list;
}
+ // Refer executor.invokeAll
+ public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks)
+ throws InterruptedException {
+ return executor.invokeAll(tasks);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
index ae54506..1ce6fae 100644
--- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
+++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -59,8 +60,8 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
* <p>
* This wrapper keeps track of the priority and the age of a queue element.
*/
- public static class QueueElement<E> implements Delayed {
- private E element;
+ public static class QueueElement<E> extends FutureTask<E> implements Delayed {
+ private XCallable<E> element;
private int priority;
private long baseTime;
boolean inQueue;
@@ -76,7 +77,8 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
* @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is
* negative.
*/
- public QueueElement(E element, int priority, long delay, TimeUnit unit) {
+ public QueueElement(XCallable<E> element, int priority, long delay, TimeUnit unit) {
+ super(element);
if (element == null) {
throw new IllegalArgumentException("element cannot be null");
}
@@ -92,20 +94,11 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu
}
/**
- * Create an Element wrapper with no delay and minimum priority.
- *
- * @param element element.
- */
- public QueueElement(E element) {
- this(element, 0, 0, TimeUnit.MILLISECONDS);
- }
-
- /**
* Return the element from the wrapper.
*
* @return the element.
*/
- public E getElement() {
+ public XCallable<E> getElement() {
return element;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 4b9a0bc..400569b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2127,6 +2127,15 @@
</description>
</property>
+ <property>
+ <name>oozie.workflow.parallel.fork.action.start</name>
+ <value>true</value>
+ <description>
+ Determines how Oozie processes starting of forked actions. If true, forked actions and their job submissions
+ are done in parallel which is best for performance. If false, they are submitted sequentially.
+ </description>
+ </property>
+
<property>
<name>oozie.coord.action.get.all.attributes</name>
<value>false</value>
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
new file mode 100644
index 0000000..e685621
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestForkedActionStartXCommand.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ForTestingActionExecutor;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.ExtendedCallableQueueService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestForkedActionStartXCommand extends XDataTestCase {
+
+ private Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
+ setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
+ setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName());
+ services = new Services();
+ services.init();
+ services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testWfSuccess() throws Exception {
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">"
+ + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">"
+ + "<path start=\"action1\"/>"
+ + "<path start=\"action2\"/>"
+ + "</fork>"
+ + "<action name=\"action1\">"
+ + "<fs></fs>"
+ + "<ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action><action name=\"action2\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message>"
+ + "</kill><"
+ + "end name=\"end\"/>"
+ + "</workflow-app>";
+ //@Formatter:on
+
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+ waitFor(20 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus()
+ == WorkflowJob.Status.SUCCEEDED;
+ }
+ });
+ assertEquals(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus(),
+ WorkflowJob.Status.SUCCEEDED);
+ }
+
+ public void testWfFailure() throws Exception {
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">"
+ + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">"
+ + "<path start=\"action1\"/>"
+ + "<path start=\"action2\"/>"
+ + "</fork>"
+ + "<action name=\"action1\">"
+ + "<fs></fs>"
+ + "<ok to=\"kill\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action><action name=\"action2\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message>"
+ + "</kill><"
+ + "end name=\"end\"/>"
+ + "</workflow-app>";
+ //@Formatter:on
+
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+ waitFor(200 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus()
+ == WorkflowJob.Status.KILLED;
+ }
+ });
+ assertEquals(WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId).getStatus(),
+ WorkflowJob.Status.KILLED);
+ }
+
+ public void testUserRetry() throws JPAExecutorException, IOException, CommandException{
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
+ + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">"
+ + "<path start=\"action1\"/>"
+ + "<path start=\"action2\"/>"
+ + "</fork>"
+ +"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
+ + "<test xmlns=\"uri:test\">"
+ + "<signal-value>${wf:conf('signal-value')}</signal-value>"
+ + "<external-status>${wf:conf('external-status')}</external-status> "
+ + "<error>${wf:conf('error')}</error>"
+ + "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+ + "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+ + "<running-mode>${wf:conf('running-mode')}</running-mode>"
+ + "</test>"
+ + "<ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action2\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message></kill>"
+ + "<end name=\"end\"/>"
+ + "</workflow-app>";
+ //@Formatter:on
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("error", "start.error");
+ conf.set("external-status", "error");
+ conf.set("signal-value", "based_on_action_status");
+
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+ final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+
+ waitFor(20 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ return (action != null && action.getUserRetryCount() == 2);
+ }
+ });
+
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
+ assertEquals(2, action.getUserRetryCount());
+ }
+
+ private void writeToFile(String appXml, String appPath) throws IOException {
+ File wf = new File(URI.create(appPath));
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(new FileWriter(wf));
+ out.println(appXml);
+ }
+ catch (IOException iex) {
+ throw iex;
+ }
+ finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
index 02f6166..45cbbc4 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.List;
+
import org.apache.hadoop.fs.Path;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.client.CoordinatorAction;
@@ -37,6 +38,7 @@ import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
@@ -123,7 +125,13 @@ public class TestReRunXCommand extends XDataTestCase {
*
* @throws Exception
*/
- public void testRerunFork() throws Exception {
+ public void testRerunFork() throws Exception{
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+ _testRerunFork();
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+ _testRerunFork();
+ }
+ public void _testRerunFork() throws Exception {
// We need the shell schema and action for this test
Services.get().setService(ActionService.class);
Services.get().getConf().set(SchemaService.WF_CONF_EXT_SCHEMAS, "shell-action-0.3.xsd");
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index 4268b30..810bc1e 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -38,6 +38,7 @@ import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.IOUtils;
@@ -51,8 +52,8 @@ public class TestSignalXCommand extends XDataTestCase {
protected void setUp() throws Exception {
super.setUp();
services = new Services();
- services.getConf().setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false);
services.init();
+ ConfigurationService.setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false);
}
@@ -61,8 +62,34 @@ public class TestSignalXCommand extends XDataTestCase {
services.destroy();
super.tearDown();
}
+ public void testJoinFail() throws Exception{
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+ _testJoinFail();
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+ _testJoinFail();
+ }
+
+ public void testSuspendPoints() throws Exception{
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+ _testSuspendPoints();
+ services.destroy();
+ services = new Services();
+ services.init();
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+ _testSuspendPoints();
+ }
+
+ public void testSuspendPointsAll() throws Exception{
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+ _testSuspendPointsAll();
+ services.destroy();
+ services = new Services();
+ services.init();
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
+ _testSuspendPointsAll();
+ }
- public void testJoinFail() throws Exception {
+ public void _testJoinFail() throws Exception {
Logger logger = Logger.getLogger(SignalXCommand.class);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
@@ -94,7 +121,7 @@ public class TestSignalXCommand extends XDataTestCase {
assertFalse(out.toString().contains("EntityExistsException"));
}
- public void testSuspendPoints() throws Exception {
+ public void _testSuspendPoints() throws Exception {
services.destroy();
LocalOozie.start();
FileSystem fs = getFileSystem();
@@ -168,7 +195,7 @@ public class TestSignalXCommand extends XDataTestCase {
LocalOozie.stop();
}
- public void testSuspendPointsAll() throws Exception {
+ public void _testSuspendPointsAll() throws Exception {
services.destroy();
LocalOozie.start();
FileSystem fs = getFileSystem();
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java
new file mode 100644
index 0000000..4fdeee5
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/ExtendedCallableQueueService.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.oozie.service.CallableQueueService;
+
+public class ExtendedCallableQueueService extends CallableQueueService {
+ @Override
+ public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks) throws InterruptedException {
+ try {
+ return super.invokeAll(tasks);
+ }
+ catch (Throwable e) {
+ throw new Error();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java b/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
index 857f4e3..b48e0cc 100644
--- a/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
+++ b/core/src/test/java/org/apache/oozie/util/TestPriorityDelayQueue.java
@@ -22,7 +22,6 @@ import junit.framework.TestCase;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,47 +47,47 @@ public class TestPriorityDelayQueue extends TestCase {
Object obj = new Object();
try {
- new PriorityDelayQueue.QueueElement<Object>(null);
+ new TestQueueElement<Object>(null);
fail();
}
catch (IllegalArgumentException ex) {
}
try {
- new PriorityDelayQueue.QueueElement<Object>(null, 0, 0, TimeUnit.MILLISECONDS);
+ new TestQueueElement<Object>(null, 0, 0, TimeUnit.MILLISECONDS);
fail();
}
catch (IllegalArgumentException ex) {
}
try {
- new PriorityDelayQueue.QueueElement<Object>(obj, -1, 0, TimeUnit.MILLISECONDS);
+ new TestQueueElement<Object>(obj, -1, 0, TimeUnit.MILLISECONDS);
fail();
}
catch (IllegalArgumentException ex) {
}
try {
- new PriorityDelayQueue.QueueElement<Object>(obj, 0, -1, TimeUnit.MILLISECONDS);
+ new TestQueueElement<Object>(obj, 0, -1, TimeUnit.MILLISECONDS);
fail();
}
catch (IllegalArgumentException ex) {
}
- PriorityDelayQueue.QueueElement<Object> e1 = new PriorityDelayQueue.QueueElement<Object>(obj);
- assertEquals(obj, e1.getElement());
+ TestQueueElement<Object> e1 = new TestQueueElement<Object>(obj);
+ assertEquals(obj, e1.getElement().call());
assertEquals(0, e1.getPriority());
assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 0);
- e1 = new PriorityDelayQueue.QueueElement<Object>(obj, 1, 200, TimeUnit.MILLISECONDS);
- assertEquals(obj, e1.getElement());
+ e1 = new TestQueueElement<Object>(obj, 1, 200, TimeUnit.MILLISECONDS);
+ assertEquals(obj, e1.getElement().call());
assertEquals(1, e1.getPriority());
assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 200);
assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) >= 100);
Thread.sleep(300);
assertTrue(e1.getDelay(TimeUnit.MILLISECONDS) <= 0);
- PriorityDelayQueue.QueueElement<Object> e2 = new PriorityDelayQueue.QueueElement<Object>(obj);
+ TestQueueElement<Object> e2 = new TestQueueElement<Object>(obj);
assertTrue(e1.compareTo(e2) < 0);
}
@@ -129,23 +128,23 @@ public class TestPriorityDelayQueue extends TestCase {
assertEquals(-1, q.getMaxSize());
assertEquals(1000, q.getMaxWait(TimeUnit.MILLISECONDS));
assertEquals(0, q.size());
- assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+ assertTrue(q.offer(new TestQueueElement<Integer>(1)));
assertEquals(1, q.size());
- assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+ assertTrue(q.offer(new TestQueueElement<Integer>(1)));
assertEquals(2, q.size());
- assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+ assertTrue(q.offer(new TestQueueElement<Integer>(1)));
assertEquals(3, q.size());
q = new PriorityDelayQueue<Integer>(1, 1000, TimeUnit.MILLISECONDS, 1);
assertEquals(1, q.getMaxSize());
assertEquals(0, q.size());
- assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+ assertTrue(q.offer(new TestQueueElement<Integer>(1)));
assertEquals(1, q.size());
- assertFalse(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+ assertFalse(q.offer(new TestQueueElement<Integer>(1)));
assertEquals(1, q.size());
assertNotNull(q.poll());
assertEquals(0, q.size());
- assertTrue(q.offer(new PriorityDelayQueue.QueueElement<Integer>(1)));
+ assertTrue(q.offer(new TestQueueElement<Integer>(1)));
assertEquals(1, q.size());
}
@@ -154,88 +153,88 @@ public class TestPriorityDelayQueue extends TestCase {
//test immediate offer polling
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(1));
- assertEquals((Integer) 1, q.poll().getElement());
+ q.offer(new TestQueueElement<Integer>(1));
+ assertEquals((Integer) 1, q.poll().getElement().call());
assertEquals(0, q.size());
//test delayed offer polling
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(2, 0, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(2, 0, 10, TimeUnit.MILLISECONDS));
assertNull(q.poll());
Thread.sleep(11);
- assertEquals((Integer) 2, q.poll().getElement());
+ assertEquals((Integer) 2, q.poll().getElement().call());
assertEquals(0, q.size());
//test different priorities immediate offer polling
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
- assertEquals((Integer) 30, q.poll().getElement());
- assertEquals((Integer) 20, q.poll().getElement());
- assertEquals((Integer) 10, q.poll().getElement());
+ assertEquals((Integer) 30, q.poll().getElement().call());
+ assertEquals((Integer) 20, q.poll().getElement().call());
+ assertEquals((Integer) 10, q.poll().getElement().call());
assertEquals(0, q.size());
//test different priorities equal delayed offer polling
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 10, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 10, TimeUnit.MILLISECONDS));
Thread.sleep(11);
- List<Integer> list = new ArrayList<Integer>();
+ List<XCallable> list = new ArrayList<XCallable>();
while (list.size() != 3) {
QueueElement<Integer> e = q.poll();
if (e != null) {
list.add(e.getElement());
}
}
- assertEquals((Integer) 30, list.get(0));
- assertEquals((Integer) 20, list.get(1));
- assertEquals((Integer) 10, list.get(2));
+ assertEquals((Integer) 30, list.get(0).call());
+ assertEquals((Integer) 20, list.get(1).call());
+ assertEquals((Integer) 10, list.get(2).call());
assertEquals(0, q.size());
//test different priorities different delayed offer polling after delay
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 20, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 20, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
Thread.sleep(21);
- list = new ArrayList<Integer>();
+ list = new ArrayList<XCallable>();
while (list.size() != 3) {
QueueElement<Integer> e = q.poll();
if (e != null) {
list.add(e.getElement());
}
}
- assertEquals((Integer) 30, list.get(0));
- assertEquals((Integer) 20, list.get(1));
- assertEquals((Integer) 10, list.get(2));
+ assertEquals((Integer) 30, list.get(0).call());
+ assertEquals((Integer) 20, list.get(1).call());
+ assertEquals((Integer) 10, list.get(2).call());
assertEquals(0, q.size());
//test different priorities different delayed offer polling within delay
long start = System.currentTimeMillis();
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
- assertEquals((Integer) 20, q.poll().getElement());
+ assertEquals((Integer) 20, q.poll().getElement().call());
long delay = System.currentTimeMillis() - start;
Thread.sleep(101 - delay);
- assertEquals((Integer) 10, q.poll().getElement());
+ assertEquals((Integer) 10, q.poll().getElement().call());
start = System.currentTimeMillis();
delay = System.currentTimeMillis() - start;
Thread.sleep(101 - delay);
- assertEquals((Integer) 30, q.poll().getElement());
+ assertEquals((Integer) 30, q.poll().getElement().call());
assertEquals(0, q.size());
}
@@ -245,46 +244,46 @@ public class TestPriorityDelayQueue extends TestCase {
//test immediate offer peeking
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(1));
- assertEquals((Integer) 1, q.peek().getElement());
+ q.offer(new TestQueueElement<Integer>(1));
+ assertEquals((Integer) 1, q.peek().getElement().call());
q.poll();
assertEquals(0, q.size());
//test delay offer peeking
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
- assertEquals((Integer) 1, q.peek().getElement());
+ q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
+ assertEquals((Integer) 1, q.peek().getElement().call());
Thread.sleep(11);
assertNotNull(q.poll());
assertEquals(0, q.size());
//test different priorities immediate offer peeking
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
- assertEquals((Integer) 30, q.peek().getElement());
+ assertEquals((Integer) 30, q.peek().getElement().call());
assertNotNull(q.poll());
- assertEquals((Integer) 20, q.peek().getElement());
+ assertEquals((Integer) 20, q.peek().getElement().call());
assertNotNull(q.poll());
- assertEquals((Integer) 10, q.peek().getElement());
+ assertEquals((Integer) 10, q.peek().getElement().call());
assertNotNull(q.poll());
assertEquals(0, q.size());
//test different priorities delayed offer peeking
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 150, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 200, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 100, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 150, TimeUnit.MILLISECONDS));
- assertEquals((Integer) 10, q.peek().getElement());
+ assertEquals((Integer) 10, q.peek().getElement().call());
Thread.sleep(100);
assertNotNull(q.poll());
- assertEquals((Integer) 20, q.peek().getElement());
+ assertEquals((Integer) 20, q.peek().getElement().call());
Thread.sleep(50);
assertNotNull(q.poll());
- assertEquals((Integer) 30, q.peek().getElement());
+ assertEquals((Integer) 30, q.peek().getElement().call());
Thread.sleep(50);
assertNotNull(q.poll());
assertEquals(0, q.size());
@@ -292,7 +291,7 @@ public class TestPriorityDelayQueue extends TestCase {
public void testAntiStarvation() throws Exception {
PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1);
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(1));
+ q.offer(new TestQueueElement<Integer>(1));
q.peek();
assertEquals(1, q.sizes()[0]);
Thread.sleep(600);
@@ -317,7 +316,7 @@ public class TestPriorityDelayQueue extends TestCase {
for (int j = 0; j < 10; j++) {
String msg = count + " - " + j;
try {
- queue.offer(new PriorityDelayQueue.QueueElement<String>(msg,
+ queue.offer(new TestQueueElement<String>(msg,
(int) (Math.random() * priorities),
(int) (Math.random() * 500), TimeUnit.MILLISECONDS));
Thread.sleep((int) (Math.random() * 50));
@@ -346,48 +345,92 @@ public class TestPriorityDelayQueue extends TestCase {
public void testIterator() throws Exception {
PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1);
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(30, 2, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(20, 1, 0, TimeUnit.MILLISECONDS));
- Iterator<PriorityDelayQueue.QueueElement<Integer>> it = q.iterator();
- assertTrue(it.hasNext());
-
- int size = 0;
- while (it.hasNext()) {
- it.next();
- size++;
- }
- assertEquals(4, size);
+ assertEquals(4, q.size());
assertNotNull(q.poll());
assertNotNull(q.poll());
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(40, 0, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(50, 2, 0, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(60, 1, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(40, 0, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(50, 2, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(60, 1, 0, TimeUnit.MILLISECONDS));
assertEquals(5, q.size());
assertNotNull(q.poll());
-
- it = q.iterator();
- Thread.sleep(50);
- size = 0;
- while (it.hasNext()) {
- it.next();
- size++;
- }
- assertEquals(4, size);
+ assertEquals(4, q.size());
}
public void testClear() {
PriorityDelayQueue<Integer> q = new PriorityDelayQueue<Integer>(3, 500, TimeUnit.MILLISECONDS, -1);
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
- q.offer(new PriorityDelayQueue.QueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(1, 1, 10, TimeUnit.MILLISECONDS));
+ q.offer(new TestQueueElement<Integer>(10, 0, 0, TimeUnit.MILLISECONDS));
assertEquals(2, q.size());
q.clear();
assertEquals(0, q.size());
}
+ public static class TestQueueElement<E> extends QueueElement<E> {
+
+ public TestQueueElement(final E element, int priority, long delay, TimeUnit unit) {
+ super(new XCallable<E>() {
+
+ @Override
+ public E call() throws Exception {
+ return element;
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public String getType() {
+ return null;
+ }
+
+ @Override
+ public long getCreatedTime() {
+ return 0;
+ }
+
+ @Override
+ public String getKey() {
+ return null;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return null;
+ }
+
+ @Override
+ public void setInterruptMode(boolean mode) {
+ }
+
+ @Override
+ public boolean inInterruptMode() {
+ return false;
+ }
+ }, priority, delay, unit);
+ ParamChecker.notNull(element, "element can't be null");
+ }
+
+ public TestQueueElement(E element) {
+ this(element, 0, 0, TimeUnit.MILLISECONDS);
+ }
+
+ protected void debug(String template, Object... args) {
+ System.out.println(MessageFormat.format(template, args));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d8425480/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d533d78..160cd72 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2345 Parallel job submission for forked actions (puru)
OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini)
OOZIE-2356 Add a way to enable/disable credentials in a workflow (rkanter)
OOZIE-2355 Hive2 Action doesn't pass along oozie configs to jobconf (rkanter)