You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@syncope.apache.org by il...@apache.org on 2020/03/06 13:03:52 UTC

[syncope] branch 2_1_X updated: [SYNCOPE-1546] Ensuring rejected propagation tasks are stored if configured + adding report metrics for asyncConnectorExecutor and asyncConnectorExecutor

This is an automated email from the ASF dual-hosted git repository.

ilgrosso pushed a commit to branch 2_1_X
in repository https://gitbox.apache.org/repos/asf/syncope.git


The following commit(s) were added to refs/heads/2_1_X by this push:
     new 9aba783  [SYNCOPE-1546] Ensuring rejected propagation tasks are stored if configured + adding report metrics for asyncConnectorExecutor and asyncConnectorExecutor
9aba783 is described below

commit 9aba7838c85bf16163d1d43841793112c6b346ba
Author: Francesco Chicchiriccò <il...@apache.org>
AuthorDate: Fri Mar 6 14:03:25 2020 +0100

    [SYNCOPE-1546] Ensuring rejected propagation tasks are stored if configured + adding report metrics for asyncConnectorExecutor and asyncConnectorExecutor
---
 .../syncope/common/lib/info/NumbersInfo.java       | 91 ++++++++++++++++++++++
 .../apache/syncope/core/logic/SyncopeLogic.java    | 48 ++++++++++++
 .../org/apache/syncope/core/logic/TaskLogic.java   |  3 +-
 .../core/provisioning/api/jexl/JexlUtils.java      |  2 +-
 .../api/propagation/PropagationTaskExecutor.java   | 12 +--
 .../provisioning/java/ConnectorFacadeProxy.java    | 10 +--
 .../AbstractPropagationTaskExecutor.java           | 64 +++++++++++----
 .../PriorityPropagationTaskExecutor.java           | 87 ++++++++-------------
 .../src/main/resources/provisioning.properties     |  4 +-
 .../src/main/resources/provisioningContext.xml     |  6 +-
 .../main/resources/provisioning.properties.mariadb |  4 +-
 .../main/resources/provisioning.properties.mssql   |  4 +-
 .../main/resources/provisioning.properties.myjson  |  4 +-
 .../main/resources/provisioning.properties.mysql   |  4 +-
 .../main/resources/provisioning.properties.pgjsonb |  4 +-
 .../resources/provisioning.properties.postgresql   |  4 +-
 .../src/main/resources/provisioning.properties     |  4 +-
 .../src/main/resources/all/provisioning.properties |  4 +-
 .../main/resources/mariadb/provisioning.properties |  4 +-
 .../main/resources/myjson/provisioning.properties  |  4 +-
 .../main/resources/mysql/provisioning.properties   |  4 +-
 .../main/resources/oracle/provisioning.properties  |  4 +-
 .../main/resources/pgjsonb/provisioning.properties |  4 +-
 .../resources/postgres/provisioning.properties     |  4 +-
 .../src/main/resources/provisioning.properties     |  4 +-
 .../resources/sqlserver/provisioning.properties    |  4 +-
 26 files changed, 265 insertions(+), 126 deletions(-)

diff --git a/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java b/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java
index b7cb814..4ed088f 100644
--- a/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java
+++ b/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java
@@ -67,6 +67,81 @@ public class NumbersInfo implements Serializable {
         }
     }
 
+    @XmlRootElement(name = "taskExecutorInfo")
+    @XmlType
+    public class TaskExecutorInfo {
+
+        private int size;
+
+        private int active;
+
+        private int queued;
+
+        private int completed;
+
+        public int getSize() {
+            return size;
+        }
+
+        public void setSize(final int size) {
+            this.size = size;
+        }
+
+        public int getActive() {
+            return active;
+        }
+
+        public void setActive(final int active) {
+            this.active = active;
+        }
+
+        public int getQueued() {
+            return queued;
+        }
+
+        public void setQueued(final int queued) {
+            this.queued = queued;
+        }
+
+        public int getCompleted() {
+            return completed;
+        }
+
+        public void setCompleted(final int completed) {
+            this.completed = completed;
+        }
+
+        @Override
+        public int hashCode() {
+            return new HashCodeBuilder().
+                    append(size).
+                    append(active).
+                    append(queued).
+                    append(completed).
+                    build();
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            final TaskExecutorInfo other = (TaskExecutorInfo) obj;
+            return new EqualsBuilder().
+                    append(size, other.size).
+                    append(active, other.active).
+                    append(queued, other.queued).
+                    append(completed, other.completed).
+                    build();
+        }
+    }
+
     private int totalUsers;
 
     @XmlJavaTypeAdapter(XmlGenericMapAdapter.class)
@@ -101,6 +176,10 @@ public class NumbersInfo implements Serializable {
     @XmlJavaTypeAdapter(XmlGenericMapAdapter.class)
     private final Map<String, Boolean> confCompleteness = new HashMap<>();
 
+    private final TaskExecutorInfo asyncConnectorExecutor = new TaskExecutorInfo();
+
+    private final TaskExecutorInfo propagationTaskExecutor = new TaskExecutorInfo();
+
     public int getTotalUsers() {
         return totalUsers;
     }
@@ -195,6 +274,14 @@ public class NumbersInfo implements Serializable {
         return confCompleteness;
     }
 
+    public TaskExecutorInfo getAsyncConnectorExecutor() {
+        return asyncConnectorExecutor;
+    }
+
+    public TaskExecutorInfo getPropagationTaskExecutor() {
+        return propagationTaskExecutor;
+    }
+
     @Override
     public int hashCode() {
         return new HashCodeBuilder().
@@ -212,6 +299,8 @@ public class NumbersInfo implements Serializable {
                 append(totalResources).
                 append(totalRoles).
                 append(confCompleteness).
+                append(asyncConnectorExecutor).
+                append(propagationTaskExecutor).
                 build();
     }
 
@@ -242,6 +331,8 @@ public class NumbersInfo implements Serializable {
                 append(totalAny2, other.totalAny2).
                 append(any2ByRealm, other.any2ByRealm).
                 append(confCompleteness, other.confCompleteness).
+                append(asyncConnectorExecutor, other.asyncConnectorExecutor).
+                append(propagationTaskExecutor, other.propagationTaskExecutor).
                 build();
     }
 }
diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java
index 5bc2e16..68769f2 100644
--- a/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java
+++ b/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.annotation.Resource;
 import org.apache.commons.lang3.StringUtils;
@@ -96,6 +98,7 @@ import org.springframework.aop.support.AopUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.PayloadApplicationEvent;
 import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
@@ -104,6 +107,12 @@ import org.springframework.transaction.annotation.Transactional;
 @Component
 public class SyncopeLogic extends AbstractLogic<EntityTO> {
 
+    private static final Pattern THREADPOOLTASKEXECUTOR_PATTERN = Pattern.compile(
+            ".*, pool size = ([0-9]+), "
+            + "active threads = ([0-9]+), "
+            + "queued tasks = ([0-9]+), "
+            + "completed tasks = ([0-9]+).*");
+
     private static final Object MONITOR = new Object();
 
     private static PlatformInfo PLATFORM_INFO;
@@ -215,6 +224,12 @@ public class SyncopeLogic extends AbstractLogic<EntityTO> {
     @Autowired
     private ImplementationLookup implLookup;
 
+    @Resource(name = "asyncConnectorFacadeExecutor")
+    private ThreadPoolTaskExecutor asyncConnectorFacadeExecutor;
+
+    @Resource(name = "propagationTaskExecutorAsyncExecutor")
+    private ThreadPoolTaskExecutor propagationTaskExecutorAsyncExecutor;
+
     public boolean isSelfRegAllowed() {
         return confDAO.find("selfRegistration.allowed", false);
     }
@@ -364,6 +379,32 @@ public class SyncopeLogic extends AbstractLogic<EntityTO> {
         return SYSTEM_INFO;
     }
 
+    private void setTaskExecutorInfo(final String toString, final NumbersInfo.TaskExecutorInfo info) {
+        Matcher matcher = THREADPOOLTASKEXECUTOR_PATTERN.matcher(toString);
+        if (matcher.matches() && matcher.groupCount() == 4) {
+            try {
+                info.setSize(Integer.valueOf(matcher.group(1)));
+            } catch (NumberFormatException e) {
+                LOG.error("While parsing thread pool size", e);
+            }
+            try {
+                info.setActive(Integer.valueOf(matcher.group(2)));
+            } catch (NumberFormatException e) {
+                LOG.error("While parsing active threads #", e);
+            }
+            try {
+                info.setQueued(Integer.valueOf(matcher.group(3)));
+            } catch (NumberFormatException e) {
+                LOG.error("While parsing queued threads #", e);
+            }
+            try {
+                info.setCompleted(Integer.valueOf(matcher.group(4)));
+            } catch (NumberFormatException e) {
+                LOG.error("While parsing completed threads #", e);
+            }
+        }
+    }
+
     @PreAuthorize("isAuthenticated()")
     public NumbersInfo numbers() {
         NumbersInfo numbersInfo = new NumbersInfo();
@@ -415,6 +456,13 @@ public class SyncopeLogic extends AbstractLogic<EntityTO> {
         numbersInfo.getConfCompleteness().put(
                 NumbersInfo.ConfItem.ROLE.name(), numbersInfo.getTotalRoles() > 0);
 
+        setTaskExecutorInfo(
+                asyncConnectorFacadeExecutor.getThreadPoolExecutor().toString(),
+                numbersInfo.getAsyncConnectorExecutor());
+        setTaskExecutorInfo(
+                propagationTaskExecutorAsyncExecutor.getThreadPoolExecutor().toString(),
+                numbersInfo.getPropagationTaskExecutor());
+
         return numbersInfo;
     }
 
diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java
index 8726897..a06242a 100644
--- a/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java
+++ b/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java
@@ -64,6 +64,7 @@ import org.apache.syncope.core.provisioning.api.notification.NotificationJobDele
 import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
 import org.apache.syncope.core.provisioning.api.utils.ExceptionUtils2;
 import org.apache.syncope.core.provisioning.java.job.TaskJob;
+import org.apache.syncope.core.provisioning.java.propagation.DefaultPropagationReporter;
 import org.quartz.JobDataMap;
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
@@ -247,7 +248,7 @@ public class TaskLogic extends AbstractExecutableLogic<TaskTO> {
                 taskInfo.setAnyType(taskTO.getAnyType());
                 taskInfo.setEntityKey(taskTO.getEntityKey());
 
-                TaskExec propExec = taskExecutor.execute(taskInfo);
+                TaskExec propExec = taskExecutor.execute(taskInfo, new DefaultPropagationReporter());
                 result = binder.getExecTO(propExec);
                 break;
 
diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java
index 04cc723..1c2dc2e 100644
--- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java
+++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java
@@ -127,7 +127,7 @@ public final class JexlUtils {
     public static void addFieldsToContext(final Object object, final JexlContext jexlContext) {
         Set<Pair<PropertyDescriptor, Field>> cached = FIELD_CACHE.get(object.getClass());
         if (cached == null) {
-            FIELD_CACHE.put(object.getClass(), new HashSet<>());
+            FIELD_CACHE.put(object.getClass(), Collections.synchronizedSet(new HashSet<>()));
 
             List<Class<?>> classes = ClassUtils.getAllSuperclasses(object.getClass());
             classes.add(object.getClass());
diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java
index 9499538..878df57 100644
--- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java
+++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java
@@ -43,15 +43,7 @@ public interface PropagationTaskExecutor {
     String MANDATORY_NULL_OR_EMPTY_ATTR_NAME = "__MANDATORY_NULL_OR_EMPTY__";
 
     /**
-     * Execute the given PropagationTask and returns the generated {@link TaskExec}.
-     *
-     * @param taskInfo to be executed
-     * @return the generated TaskExec
-     */
-    TaskExec execute(PropagationTaskInfo taskInfo);
-
-    /**
-     * Execute the given PropagationTask and returns the generated {@link TaskExec}.
+     * Execute the given task and returns the generated {@link TaskExec}.
      *
      * @param taskInfo to be executed
      * @param reporter to report propagation execution status
@@ -60,7 +52,7 @@ public interface PropagationTaskExecutor {
     TaskExec execute(PropagationTaskInfo taskInfo, PropagationReporter reporter);
 
     /**
-     * Execute a collection of PropagationTask objects.
+     * Execute the given collection of tasks.
      * The process is interrupted as soon as the result of the communication with a resource with non-null priority is
      * in error.
      *
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java
index d2635e8..a3bebe5 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ClassUtils;
+import org.springframework.util.CollectionUtils;
 
 public class ConnectorFacadeProxy implements Connector {
 
@@ -104,11 +105,10 @@ public class ConnectorFacadeProxy implements Connector {
         // set connector configuration according to conninstance's
         ConfigurationProperties properties = apiConfig.getConfigurationProperties();
         connInstance.getConf().stream().
-                filter(property -> (property.getValues() != null && !property.getValues().isEmpty())).
-                forEachOrdered(property -> {
-                    properties.setPropertyValue(property.getSchema().getName(),
-                            getPropertyValue(property.getSchema().getType(), property.getValues()));
-                });
+                filter(property -> !CollectionUtils.isEmpty(property.getValues())).
+                forEach(property -> properties.setPropertyValue(
+                property.getSchema().getName(),
+                getPropertyValue(property.getSchema().getType(), property.getValues())));
 
         // set pooling configuration (if supported) according to conninstance's
         if (connInstance.getPoolConf() != null) {
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java
index fd4e98a..e76a7de 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java
@@ -338,13 +338,7 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
         return result;
     }
 
-    @Override
-    public TaskExec execute(final PropagationTaskInfo taskInfo) {
-        return execute(taskInfo, null);
-    }
-
-    @Override
-    public TaskExec execute(final PropagationTaskInfo taskInfo, final PropagationReporter reporter) {
+    protected PropagationTask buildTask(final PropagationTaskInfo taskInfo) {
         PropagationTask task;
         if (taskInfo.getKey() == null) {
             // double-checks that provided External Resource is valid, for further actions
@@ -371,6 +365,13 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
         }
         task.setAttributes(attributes);
 
+        return task;
+    }
+
+    @Override
+    public TaskExec execute(final PropagationTaskInfo taskInfo, final PropagationReporter reporter) {
+        PropagationTask task = buildTask(taskInfo);
+
         Connector connector = taskInfo.getConnector() == null
                 ? connFactory.getConnector(task.getResource())
                 : taskInfo.getConnector();
@@ -461,9 +462,7 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
 
             propagationAttempted.set(true);
 
-            actions.forEach(action -> {
-                action.onError(task, execution, e);
-            });
+            actions.forEach(action -> action.onError(task, execution, e));
         } finally {
             // Try to read remote object AFTER any actual operation
             if (connector != null) {
@@ -505,13 +504,11 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
                 taskDAO.save(task);
             }
 
-            if (reporter != null) {
-                reporter.onSuccessOrNonPriorityResourceFailures(taskInfo,
-                        ExecStatus.valueOf(execution.getStatus()),
-                        failureReason,
-                        beforeObj,
-                        afterObj);
-            }
+            reporter.onSuccessOrNonPriorityResourceFailures(taskInfo,
+                    ExecStatus.valueOf(execution.getStatus()),
+                    failureReason,
+                    beforeObj,
+                    afterObj);
         }
 
         for (PropagationActions action : actions) {
@@ -560,6 +557,39 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
     protected abstract void doExecute(
             Collection<PropagationTaskInfo> taskInfos, PropagationReporter reporter, boolean nullPriorityAsync);
 
+    protected TaskExec rejected(
+            final PropagationTaskInfo taskInfo,
+            final String rejectReason,
+            final PropagationReporter reporter) {
+
+        PropagationTask task = buildTask(taskInfo);
+
+        TaskExec execution = entityFactory.newEntity(TaskExec.class);
+        execution.setStatus(ExecStatus.NOT_ATTEMPTED.name());
+
+        execution.setStart(new Date());
+        execution.setMessage(rejectReason);
+        execution.setEnd(execution.getStart());
+
+        if (hasToBeregistered(task, execution)) {
+            LOG.debug("Execution to be stored: {}", execution);
+
+            execution.setTask(task);
+            task.add(execution);
+
+            taskDAO.save(task);
+        }
+
+        reporter.onSuccessOrNonPriorityResourceFailures(
+                taskInfo,
+                ExecStatus.valueOf(execution.getStatus()),
+                rejectReason,
+                null,
+                null);
+
+        return execution;
+    }
+
     @Override
     public PropagationReporter execute(
             final Collection<PropagationTaskInfo> taskInfos,
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
index 283e7dd..ecd4764 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
@@ -21,15 +21,10 @@ package org.apache.syncope.core.provisioning.java.propagation;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Resource;
 import org.apache.syncope.common.lib.types.ExecStatus;
@@ -80,26 +75,15 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec
             final PropagationReporter reporter,
             final boolean nullPriorityAsync) {
 
-        List<PropagationTaskInfo> prioritizedTasks = new ArrayList<>();
-
-        int[] connRequestTimeout = { 60 };
-
-        taskInfos.stream().filter(task -> task.getExternalResource().getPropagationPriority() != null).forEach(task -> {
-            prioritizedTasks.add(task);
-
-            if (task.getExternalResource().getConnector().getConnRequestTimeout() != null
-                    && connRequestTimeout[0] < task.getExternalResource().getConnector().getConnRequestTimeout()) {
-
-                connRequestTimeout[0] = task.getExternalResource().getConnector().getConnRequestTimeout();
-                LOG.debug("Upgrade request connection timeout to {}", connRequestTimeout);
-            }
-        });
-
-        prioritizedTasks.sort(Comparator.comparing(task -> task.getExternalResource().getPropagationPriority()));
+        List<PropagationTaskInfo> prioritizedTasks = taskInfos.stream().
+                filter(task -> task.getExternalResource().getPropagationPriority() != null).
+                sorted(Comparator.comparing(task -> task.getExternalResource().getPropagationPriority())).
+                collect(Collectors.toList());
         LOG.debug("Propagation tasks sorted by priority, for serial execution: {}", prioritizedTasks);
 
-        Set<PropagationTaskInfo> concurrentTasks = taskInfos.stream().
-                filter(task -> !prioritizedTasks.contains(task)).collect(Collectors.toSet());
+        List<PropagationTaskInfo> concurrentTasks = taskInfos.stream().
+                filter(task -> !prioritizedTasks.contains(task)).
+                collect(Collectors.toList());
         LOG.debug("Propagation tasks for concurrent execution: {}", concurrentTasks);
 
         // first process priority resources sequentially and fail as soon as any propagation failure is reported
@@ -123,42 +107,33 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec
         });
 
         // then process non-priority resources concurrently...
-        CompletionService<TaskExec> completionService = new ExecutorCompletionService<>(executor);
-        Map<PropagationTaskInfo, Future<TaskExec>> nullPriority = new HashMap<>(concurrentTasks.size());
-        concurrentTasks.forEach(taskInfo -> {
-            try {
-                nullPriority.put(
-                        taskInfo,
-                        completionService.submit(newPropagationTaskCallable(taskInfo, reporter)));
-            } catch (Exception e) {
-                LOG.error("Unexpected exception", e);
-            }
-        });
-        // ...waiting for all callables to complete, if async processing was not required
-        if (!nullPriority.isEmpty()) {
-            if (nullPriorityAsync) {
-                nullPriority.forEach((task, exec) -> {
-                    reporter.onSuccessOrNonPriorityResourceFailures(task, ExecStatus.CREATED, null, null, null);
-                });
-            } else {
-                final Set<Future<TaskExec>> nullPriorityFutures = new HashSet<>(nullPriority.values());
+        if (!concurrentTasks.isEmpty()) {
+            CompletionService<TaskExec> completionService = new ExecutorCompletionService<>(executor);
+            List<Future<TaskExec>> futures = new ArrayList<>();
+
+            concurrentTasks.forEach(taskInfo -> {
                 try {
-                    executor.submit(() -> {
-                        while (!nullPriorityFutures.isEmpty()) {
-                            try {
-                                nullPriorityFutures.remove(completionService.take());
-                            } catch (Exception e) {
-                                LOG.error("Unexpected exception", e);
-                            }
-                        }
-                    }).get(connRequestTimeout[0], TimeUnit.SECONDS);
+                    futures.add(completionService.submit(newPropagationTaskCallable(taskInfo, reporter)));
+
+                    if (nullPriorityAsync) {
+                        reporter.onSuccessOrNonPriorityResourceFailures(
+                                taskInfo, ExecStatus.CREATED, null, null, null);
+                    }
                 } catch (Exception e) {
-                    LOG.error("Unexpected exception", e);
-                } finally {
-                    nullPriorityFutures.forEach(future -> future.cancel(true));
-                    nullPriorityFutures.clear();
-                    nullPriority.clear();
+                    LOG.error("While submitting task for async execution", taskInfo, e);
+                    rejected(taskInfo, e.getMessage(), reporter);
                 }
+            });
+
+            // ...waiting for all callables to complete, if async processing was not required
+            if (!nullPriorityAsync) {
+                futures.forEach(future -> {
+                    try {
+                        future.get();
+                    } catch (Exception e) {
+                        LOG.error("Unexpected exception", e);
+                    }
+                });
             }
         }
     }
diff --git a/core/provisioning-java/src/main/resources/provisioning.properties b/core/provisioning-java/src/main/resources/provisioning.properties
index 220105e..24f1ea5 100644
--- a/core/provisioning-java/src/main/resources/provisioning.properties
+++ b/core/provisioning-java/src/main/resources/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/core/provisioning-java/src/main/resources/provisioningContext.xml b/core/provisioning-java/src/main/resources/provisioningContext.xml
index bcf9a35..4aeca3c 100644
--- a/core/provisioning-java/src/main/resources/provisioningContext.xml
+++ b/core/provisioning-java/src/main/resources/provisioningContext.xml
@@ -33,7 +33,9 @@ under the License.
   <!-- Used by AsyncConnectorFacade -->
   <task:annotation-driven executor="asyncConnectorFacadeExecutor"/>
   <task:executor id="asyncConnectorFacadeExecutor"
-                 pool-size="${asyncConnectorFacadeExecutor.poolSize}"/>
+                 pool-size="${asyncConnectorFacadeExecutor.poolSize}"
+                 queue-capacity="${asyncConnectorFacadeExecutor.queueCapacity}"
+                 rejection-policy="ABORT"/>
 
   <!-- Used by PriorityPropagationTaskExecutor -->
   <task:executor id="propagationTaskExecutorAsyncExecutor"
@@ -41,7 +43,7 @@ under the License.
                  queue-capacity="${propagationTaskExecutorAsyncExecutor.queueCapacity}"
                  rejection-policy="ABORT"/>
   <bean class="${propagationTaskExecutor}"/>
-  
+
   <bean class="${userProvisioningManager}"/>
   <bean class="${groupProvisioningManager}"/>
   <bean class="${anyObjectProvisioningManager}"/>
diff --git a/docker/core/src/main/resources/provisioning.properties.mariadb b/docker/core/src/main/resources/provisioning.properties.mariadb
index 8f17c5a..693243e 100644
--- a/docker/core/src/main/resources/provisioning.properties.mariadb
+++ b/docker/core/src/main/resources/provisioning.properties.mariadb
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.mssql b/docker/core/src/main/resources/provisioning.properties.mssql
index 3529791..4e47090 100644
--- a/docker/core/src/main/resources/provisioning.properties.mssql
+++ b/docker/core/src/main/resources/provisioning.properties.mssql
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.myjson b/docker/core/src/main/resources/provisioning.properties.myjson
index 191693c..c865f0c 100644
--- a/docker/core/src/main/resources/provisioning.properties.myjson
+++ b/docker/core/src/main/resources/provisioning.properties.myjson
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.mysql b/docker/core/src/main/resources/provisioning.properties.mysql
index 191693c..c865f0c 100644
--- a/docker/core/src/main/resources/provisioning.properties.mysql
+++ b/docker/core/src/main/resources/provisioning.properties.mysql
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.pgjsonb b/docker/core/src/main/resources/provisioning.properties.pgjsonb
index 220105e..24f1ea5 100644
--- a/docker/core/src/main/resources/provisioning.properties.pgjsonb
+++ b/docker/core/src/main/resources/provisioning.properties.pgjsonb
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.postgresql b/docker/core/src/main/resources/provisioning.properties.postgresql
index 220105e..24f1ea5 100644
--- a/docker/core/src/main/resources/provisioning.properties.postgresql
+++ b/docker/core/src/main/resources/provisioning.properties.postgresql
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/ext/camel/provisioning-camel/src/main/resources/provisioning.properties b/ext/camel/provisioning-camel/src/main/resources/provisioning.properties
index fd39626..1bef1b2 100644
--- a/ext/camel/provisioning-camel/src/main/resources/provisioning.properties
+++ b/ext/camel/provisioning-camel/src/main/resources/provisioning.properties
@@ -16,9 +16,9 @@
 # under the License.
 camel.directory=${conf.directory}
 
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/all/provisioning.properties b/fit/core-reference/src/main/resources/all/provisioning.properties
index 7d7f79c..fa10e90 100644
--- a/fit/core-reference/src/main/resources/all/provisioning.properties
+++ b/fit/core-reference/src/main/resources/all/provisioning.properties
@@ -16,9 +16,9 @@
 # under the License.
 camel.directory=${conf.directory}
 
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/mariadb/provisioning.properties b/fit/core-reference/src/main/resources/mariadb/provisioning.properties
index 8f17c5a..693243e 100644
--- a/fit/core-reference/src/main/resources/mariadb/provisioning.properties
+++ b/fit/core-reference/src/main/resources/mariadb/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/myjson/provisioning.properties b/fit/core-reference/src/main/resources/myjson/provisioning.properties
index 191693c..c865f0c 100644
--- a/fit/core-reference/src/main/resources/myjson/provisioning.properties
+++ b/fit/core-reference/src/main/resources/myjson/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/mysql/provisioning.properties b/fit/core-reference/src/main/resources/mysql/provisioning.properties
index 191693c..c865f0c 100644
--- a/fit/core-reference/src/main/resources/mysql/provisioning.properties
+++ b/fit/core-reference/src/main/resources/mysql/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/oracle/provisioning.properties b/fit/core-reference/src/main/resources/oracle/provisioning.properties
index a009d81..e62bd8a 100644
--- a/fit/core-reference/src/main/resources/oracle/provisioning.properties
+++ b/fit/core-reference/src/main/resources/oracle/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties b/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties
index 9347dd7..8d3c7ab 100644
--- a/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties
+++ b/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/postgres/provisioning.properties b/fit/core-reference/src/main/resources/postgres/provisioning.properties
index 9347dd7..8d3c7ab 100644
--- a/fit/core-reference/src/main/resources/postgres/provisioning.properties
+++ b/fit/core-reference/src/main/resources/postgres/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/provisioning.properties b/fit/core-reference/src/main/resources/provisioning.properties
index e75de15..25812f0 100644
--- a/fit/core-reference/src/main/resources/provisioning.properties
+++ b/fit/core-reference/src/main/resources/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/sqlserver/provisioning.properties b/fit/core-reference/src/main/resources/sqlserver/provisioning.properties
index 3529791..4e47090 100644
--- a/fit/core-reference/src/main/resources/sqlserver/provisioning.properties
+++ b/fit/core-reference/src/main/resources/sqlserver/provisioning.properties
@@ -14,9 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
 
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
 propagationTaskExecutorAsyncExecutor.poolSize=5-25
 propagationTaskExecutorAsyncExecutor.queueCapacity=100
 propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor