You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@oozie.apache.org by "chenhaodan (Jira)" <ji...@apache.org> on 2023/06/05 10:37:00 UTC
[jira] [Commented] (OOZIE-3715) Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand
[ https://issues.apache.org/jira/browse/OOZIE-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729267#comment-17729267 ]
chenhaodan commented on OOZIE-3715:
-----------------------------------
Hi, [~dionusos] ,do you have any other feedback or remarks regarding this change({*}OOZIE-3715-005.patch{*} )? Thank you!
> Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand
> -----------------------------------------------------------------------------------------------------
>
> Key: OOZIE-3715
> URL: https://issues.apache.org/jira/browse/OOZIE-3715
> Project: Oozie
> Issue Type: Bug
> Components: core
> Affects Versions: 5.2.1
> Reporter: chenhaodan
> Assignee: chenhaodan
> Priority: Major
> Labels: patch
> Fix For: trunk
>
> Attachments: OOZIE-3715-001.patch, OOZIE-3715-002.patch, OOZIE-3715-003.patch, OOZIE-3715-004.patch, OOZIE-3715-005.patch, forkSubmitFail_issue.txt, status.png
>
>
> When I fork 2 transitions( A and B) to submit , when A transition failed , B transition still Running , because can't execute KillXCommand.
> SignalXCommand.startForkedActions, when one transition submit fail will create a new ActionStartXCommand and invoke failJob, failJob will add WorkflowNotificationXCommand and KillXCommand to {color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method , but we add WorkflowNotificationXCommand and KillXCommand to ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color} , but not SignalXCommand , so can't execute KillXCommand.
> The code is as follows :
>
> {code:java}
> public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
> ......
> for (Future<ActionExecutorContext> result : futures) {
> ......
> if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
> new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
> ......
> }
> ......
> }
> {code}
>
> {code:java}
> public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
> WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
> if (!handleUserRetry(context, action)) {
> incrActionErrorCounter(action.getType(), "failed", 1);
> LOG.warn("Failing Job due to failed action [{0}]", action.getName());
> try {
> workflow.getWorkflowInstance().fail(action.getName());
> WorkflowInstance wfInstance = workflow.getWorkflowInstance();
> ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
> workflow.setWorkflowInstance(wfInstance);
> workflow.setStatus(WorkflowJob.Status.FAILED);
> action.setStatus(WorkflowAction.Status.FAILED);
> action.resetPending();
> queue(new WorkflowNotificationXCommand(workflow, action));
> queue(new KillXCommand(workflow.getId())); InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation());
> }
> catch (WorkflowException ex) {
> throw new CommandException(ex);
> }
> }
> }
> {code}
>
> {code:java}
> public final T call() throws CommandException {
> if (commandQueue != null) {
> for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
> LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
> if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
> LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
> .size(), entry.getKey());
> }
> }
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)