You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@oozie.apache.org by "chenhaodan (Jira)" <ji...@apache.org> on 2023/06/05 10:37:00 UTC

[jira] [Commented] (OOZIE-3715) Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand

    [ https://issues.apache.org/jira/browse/OOZIE-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729267#comment-17729267 ] 

chenhaodan commented on OOZIE-3715:
-----------------------------------

Hi, [~dionusos] ,do you have any other feedback or remarks regarding this change({*}OOZIE-3715-005.patch{*} )? Thank you!

> Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand
> -----------------------------------------------------------------------------------------------------
>
>                 Key: OOZIE-3715
>                 URL: https://issues.apache.org/jira/browse/OOZIE-3715
>             Project: Oozie
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 5.2.1
>            Reporter: chenhaodan
>            Assignee: chenhaodan
>            Priority: Major
>              Labels: patch
>             Fix For: trunk
>
>         Attachments: OOZIE-3715-001.patch, OOZIE-3715-002.patch, OOZIE-3715-003.patch, OOZIE-3715-004.patch, OOZIE-3715-005.patch, forkSubmitFail_issue.txt, status.png
>
>
> When I fork 2 transitions( A and B) to submit , when A transition failed , B transition  still Running , because can't execute KillXCommand.
> SignalXCommand.startForkedActions, when one transition  submit fail will create a new ActionStartXCommand and invoke failJob, failJob will add WorkflowNotificationXCommand and KillXCommand to {color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method , but we add WorkflowNotificationXCommand and KillXCommand to ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color}  , but not SignalXCommand  ,  so can't execute KillXCommand. 
> The code is as follows :
>  
> {code:java}
>     public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
>         ......
>             for (Future<ActionExecutorContext> result : futures) {
>              ......
>                 if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
>                     new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
>              ......
>         }
>        ......
>     }
> {code}
>  
> {code:java}
> public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
>         WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
>         if (!handleUserRetry(context, action)) {
>             incrActionErrorCounter(action.getType(), "failed", 1);
>             LOG.warn("Failing Job due to failed action [{0}]", action.getName());
>             try {
>                 workflow.getWorkflowInstance().fail(action.getName());
>                 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
>                 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
>                 workflow.setWorkflowInstance(wfInstance);
>                 workflow.setStatus(WorkflowJob.Status.FAILED);
>                 action.setStatus(WorkflowAction.Status.FAILED);
>                 action.resetPending();
>                 queue(new WorkflowNotificationXCommand(workflow, action));
>                 queue(new KillXCommand(workflow.getId()));             InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation());
>             }
>             catch (WorkflowException ex) {
>                 throw new CommandException(ex);
>             }
>         }
>     }
> {code}
>  
> {code:java}
> public final T call() throws CommandException {
>     if (commandQueue != null) {
>         for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
>             LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
>             if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
>                 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
>                     .size(), entry.getKey());
>             }
>         }
>      } 
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)