You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@oozie.apache.org by GitBox <gi...@apache.org> on 2022/01/12 16:28:31 UTC

[GitHub] [oozie] zuston commented on a change in pull request #64: [OOZIE-3646] Possible dead-lock in SignalXCommand

zuston commented on a change in pull request #64:
URL: https://github.com/apache/oozie/pull/64#discussion_r783241241



##########
File path: core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
##########
@@ -468,21 +468,54 @@ else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(work
 
     public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
 
-        List<CallableWrapper<ActionExecutorContext>> tasks = new ArrayList<CallableWrapper<ActionExecutorContext>>();
         List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
         List<JsonBean> insertList = new ArrayList<JsonBean>();
 
         boolean endWorkflow = false;
         boolean submitJobByQueuing = false;
-        for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
-            LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId());
-            tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>(
-                    new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0));
-        }
 
         try {
-            List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class)
-                    .invokeAll(tasks);
+            /**
+             * The limited thread execution mechanism aims to solve the dead-lock when all active threads are
+             * executing the SignalXCommand's invokeAll method.
+             *
+             * Solution
+             * 1. Need to limit directly invokeAll call when the num of rest threads is less than the tasks
+             * 2. To obtain correct active threads number in callableQueue, the SignalXCommand.class lock is needed.
+             *
+             */
+            CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
+            List<Future<ActionExecutorContext>> futures = new ArrayList<>();
+
+            synchronized (SignalXCommand.class) {
+                long limitedRestThreadNum = callableQueueService.getQueueThreadsNumber() - callableQueueService.getThreadActiveCount();
+                if (limitedRestThreadNum < workflowActionBeanListForForked.size()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Limited callable queue rest threads number: " + limitedRestThreadNum + ", needed forked task size: "
+                                + workflowActionBeanListForForked.size() + ", tasks will be submitted to queue by async mode.");
+                    }
+                    submitJobByQueuing = true;
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Starting forked actions parallely: " + workflowActionBeanListForForked);
+                    }
+                    for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
+                        futures.add(
+                                callableQueueService.submit(callableQueueService.new CallableWrapper<ActionExecutorContext>(
+                                        new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0))
+                        );
+                    }
+                }
+            }
+
+            if (!futures.isEmpty()) {
+                long startTime = System.currentTimeMillis();
+                callableQueueService.blockingWait(futures);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Execution time: " + (System.currentTimeMillis() - startTime) / 1000 + " sec");
+                }

Review comment:
       got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@oozie.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org