You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@syncope.apache.org by il...@apache.org on 2020/03/06 13:03:52 UTC
[syncope] branch 2_1_X updated: [SYNCOPE-1546] Ensuring rejected
propagation tasks are stored if configured + adding report metrics for
asyncConnectorExecutor and asyncConnectorExecutor
This is an automated email from the ASF dual-hosted git repository.
ilgrosso pushed a commit to branch 2_1_X
in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/2_1_X by this push:
new 9aba783 [SYNCOPE-1546] Ensuring rejected propagation tasks are stored if configured + adding report metrics for asyncConnectorExecutor and asyncConnectorExecutor
9aba783 is described below
commit 9aba7838c85bf16163d1d43841793112c6b346ba
Author: Francesco Chicchiriccò <il...@apache.org>
AuthorDate: Fri Mar 6 14:03:25 2020 +0100
[SYNCOPE-1546] Ensuring rejected propagation tasks are stored if configured + adding report metrics for asyncConnectorExecutor and asyncConnectorExecutor
---
.../syncope/common/lib/info/NumbersInfo.java | 91 ++++++++++++++++++++++
.../apache/syncope/core/logic/SyncopeLogic.java | 48 ++++++++++++
.../org/apache/syncope/core/logic/TaskLogic.java | 3 +-
.../core/provisioning/api/jexl/JexlUtils.java | 2 +-
.../api/propagation/PropagationTaskExecutor.java | 12 +--
.../provisioning/java/ConnectorFacadeProxy.java | 10 +--
.../AbstractPropagationTaskExecutor.java | 64 +++++++++++----
.../PriorityPropagationTaskExecutor.java | 87 ++++++++-------------
.../src/main/resources/provisioning.properties | 4 +-
.../src/main/resources/provisioningContext.xml | 6 +-
.../main/resources/provisioning.properties.mariadb | 4 +-
.../main/resources/provisioning.properties.mssql | 4 +-
.../main/resources/provisioning.properties.myjson | 4 +-
.../main/resources/provisioning.properties.mysql | 4 +-
.../main/resources/provisioning.properties.pgjsonb | 4 +-
.../resources/provisioning.properties.postgresql | 4 +-
.../src/main/resources/provisioning.properties | 4 +-
.../src/main/resources/all/provisioning.properties | 4 +-
.../main/resources/mariadb/provisioning.properties | 4 +-
.../main/resources/myjson/provisioning.properties | 4 +-
.../main/resources/mysql/provisioning.properties | 4 +-
.../main/resources/oracle/provisioning.properties | 4 +-
.../main/resources/pgjsonb/provisioning.properties | 4 +-
.../resources/postgres/provisioning.properties | 4 +-
.../src/main/resources/provisioning.properties | 4 +-
.../resources/sqlserver/provisioning.properties | 4 +-
26 files changed, 265 insertions(+), 126 deletions(-)
diff --git a/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java b/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java
index b7cb814..4ed088f 100644
--- a/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java
+++ b/common/lib/src/main/java/org/apache/syncope/common/lib/info/NumbersInfo.java
@@ -67,6 +67,81 @@ public class NumbersInfo implements Serializable {
}
}
+ @XmlRootElement(name = "taskExecutorInfo")
+ @XmlType
+ public class TaskExecutorInfo {
+
+ private int size;
+
+ private int active;
+
+ private int queued;
+
+ private int completed;
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(final int size) {
+ this.size = size;
+ }
+
+ public int getActive() {
+ return active;
+ }
+
+ public void setActive(final int active) {
+ this.active = active;
+ }
+
+ public int getQueued() {
+ return queued;
+ }
+
+ public void setQueued(final int queued) {
+ this.queued = queued;
+ }
+
+ public int getCompleted() {
+ return completed;
+ }
+
+ public void setCompleted(final int completed) {
+ this.completed = completed;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().
+ append(size).
+ append(active).
+ append(queued).
+ append(completed).
+ build();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final TaskExecutorInfo other = (TaskExecutorInfo) obj;
+ return new EqualsBuilder().
+ append(size, other.size).
+ append(active, other.active).
+ append(queued, other.queued).
+ append(completed, other.completed).
+ build();
+ }
+ }
+
private int totalUsers;
@XmlJavaTypeAdapter(XmlGenericMapAdapter.class)
@@ -101,6 +176,10 @@ public class NumbersInfo implements Serializable {
@XmlJavaTypeAdapter(XmlGenericMapAdapter.class)
private final Map<String, Boolean> confCompleteness = new HashMap<>();
+ private final TaskExecutorInfo asyncConnectorExecutor = new TaskExecutorInfo();
+
+ private final TaskExecutorInfo propagationTaskExecutor = new TaskExecutorInfo();
+
public int getTotalUsers() {
return totalUsers;
}
@@ -195,6 +274,14 @@ public class NumbersInfo implements Serializable {
return confCompleteness;
}
+ public TaskExecutorInfo getAsyncConnectorExecutor() {
+ return asyncConnectorExecutor;
+ }
+
+ public TaskExecutorInfo getPropagationTaskExecutor() {
+ return propagationTaskExecutor;
+ }
+
@Override
public int hashCode() {
return new HashCodeBuilder().
@@ -212,6 +299,8 @@ public class NumbersInfo implements Serializable {
append(totalResources).
append(totalRoles).
append(confCompleteness).
+ append(asyncConnectorExecutor).
+ append(propagationTaskExecutor).
build();
}
@@ -242,6 +331,8 @@ public class NumbersInfo implements Serializable {
append(totalAny2, other.totalAny2).
append(any2ByRealm, other.any2ByRealm).
append(confCompleteness, other.confCompleteness).
+ append(asyncConnectorExecutor, other.asyncConnectorExecutor).
+ append(propagationTaskExecutor, other.propagationTaskExecutor).
build();
}
}
diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java
index 5bc2e16..68769f2 100644
--- a/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java
+++ b/core/logic/src/main/java/org/apache/syncope/core/logic/SyncopeLogic.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
@@ -96,6 +98,7 @@ import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -104,6 +107,12 @@ import org.springframework.transaction.annotation.Transactional;
@Component
public class SyncopeLogic extends AbstractLogic<EntityTO> {
+ private static final Pattern THREADPOOLTASKEXECUTOR_PATTERN = Pattern.compile(
+ ".*, pool size = ([0-9]+), "
+ + "active threads = ([0-9]+), "
+ + "queued tasks = ([0-9]+), "
+ + "completed tasks = ([0-9]+).*");
+
private static final Object MONITOR = new Object();
private static PlatformInfo PLATFORM_INFO;
@@ -215,6 +224,12 @@ public class SyncopeLogic extends AbstractLogic<EntityTO> {
@Autowired
private ImplementationLookup implLookup;
+ @Resource(name = "asyncConnectorFacadeExecutor")
+ private ThreadPoolTaskExecutor asyncConnectorFacadeExecutor;
+
+ @Resource(name = "propagationTaskExecutorAsyncExecutor")
+ private ThreadPoolTaskExecutor propagationTaskExecutorAsyncExecutor;
+
public boolean isSelfRegAllowed() {
return confDAO.find("selfRegistration.allowed", false);
}
@@ -364,6 +379,32 @@ public class SyncopeLogic extends AbstractLogic<EntityTO> {
return SYSTEM_INFO;
}
+ private void setTaskExecutorInfo(final String toString, final NumbersInfo.TaskExecutorInfo info) {
+ Matcher matcher = THREADPOOLTASKEXECUTOR_PATTERN.matcher(toString);
+ if (matcher.matches() && matcher.groupCount() == 4) {
+ try {
+ info.setSize(Integer.valueOf(matcher.group(1)));
+ } catch (NumberFormatException e) {
+ LOG.error("While parsing thread pool size", e);
+ }
+ try {
+ info.setActive(Integer.valueOf(matcher.group(2)));
+ } catch (NumberFormatException e) {
+ LOG.error("While parsing active threads #", e);
+ }
+ try {
+ info.setQueued(Integer.valueOf(matcher.group(3)));
+ } catch (NumberFormatException e) {
+ LOG.error("While parsing queued threads #", e);
+ }
+ try {
+ info.setCompleted(Integer.valueOf(matcher.group(4)));
+ } catch (NumberFormatException e) {
+ LOG.error("While parsing completed threads #", e);
+ }
+ }
+ }
+
@PreAuthorize("isAuthenticated()")
public NumbersInfo numbers() {
NumbersInfo numbersInfo = new NumbersInfo();
@@ -415,6 +456,13 @@ public class SyncopeLogic extends AbstractLogic<EntityTO> {
numbersInfo.getConfCompleteness().put(
NumbersInfo.ConfItem.ROLE.name(), numbersInfo.getTotalRoles() > 0);
+ setTaskExecutorInfo(
+ asyncConnectorFacadeExecutor.getThreadPoolExecutor().toString(),
+ numbersInfo.getAsyncConnectorExecutor());
+ setTaskExecutorInfo(
+ propagationTaskExecutorAsyncExecutor.getThreadPoolExecutor().toString(),
+ numbersInfo.getPropagationTaskExecutor());
+
return numbersInfo;
}
diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java
index 8726897..a06242a 100644
--- a/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java
+++ b/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java
@@ -64,6 +64,7 @@ import org.apache.syncope.core.provisioning.api.notification.NotificationJobDele
import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
import org.apache.syncope.core.provisioning.api.utils.ExceptionUtils2;
import org.apache.syncope.core.provisioning.java.job.TaskJob;
+import org.apache.syncope.core.provisioning.java.propagation.DefaultPropagationReporter;
import org.quartz.JobDataMap;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
@@ -247,7 +248,7 @@ public class TaskLogic extends AbstractExecutableLogic<TaskTO> {
taskInfo.setAnyType(taskTO.getAnyType());
taskInfo.setEntityKey(taskTO.getEntityKey());
- TaskExec propExec = taskExecutor.execute(taskInfo);
+ TaskExec propExec = taskExecutor.execute(taskInfo, new DefaultPropagationReporter());
result = binder.getExecTO(propExec);
break;
diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java
index 04cc723..1c2dc2e 100644
--- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java
+++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/jexl/JexlUtils.java
@@ -127,7 +127,7 @@ public final class JexlUtils {
public static void addFieldsToContext(final Object object, final JexlContext jexlContext) {
Set<Pair<PropertyDescriptor, Field>> cached = FIELD_CACHE.get(object.getClass());
if (cached == null) {
- FIELD_CACHE.put(object.getClass(), new HashSet<>());
+ FIELD_CACHE.put(object.getClass(), Collections.synchronizedSet(new HashSet<>()));
List<Class<?>> classes = ClassUtils.getAllSuperclasses(object.getClass());
classes.add(object.getClass());
diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java
index 9499538..878df57 100644
--- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java
+++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskExecutor.java
@@ -43,15 +43,7 @@ public interface PropagationTaskExecutor {
String MANDATORY_NULL_OR_EMPTY_ATTR_NAME = "__MANDATORY_NULL_OR_EMPTY__";
/**
- * Execute the given PropagationTask and returns the generated {@link TaskExec}.
- *
- * @param taskInfo to be executed
- * @return the generated TaskExec
- */
- TaskExec execute(PropagationTaskInfo taskInfo);
-
- /**
- * Execute the given PropagationTask and returns the generated {@link TaskExec}.
+ * Execute the given task and returns the generated {@link TaskExec}.
*
* @param taskInfo to be executed
* @param reporter to report propagation execution status
@@ -60,7 +52,7 @@ public interface PropagationTaskExecutor {
TaskExec execute(PropagationTaskInfo taskInfo, PropagationReporter reporter);
/**
- * Execute a collection of PropagationTask objects.
+ * Execute the given collection of tasks.
* The process is interrupted as soon as the result of the communication with a resource with non-null priority is
* in error.
*
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java
index d2635e8..a3bebe5 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ConnectorFacadeProxy.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ClassUtils;
+import org.springframework.util.CollectionUtils;
public class ConnectorFacadeProxy implements Connector {
@@ -104,11 +105,10 @@ public class ConnectorFacadeProxy implements Connector {
// set connector configuration according to conninstance's
ConfigurationProperties properties = apiConfig.getConfigurationProperties();
connInstance.getConf().stream().
- filter(property -> (property.getValues() != null && !property.getValues().isEmpty())).
- forEachOrdered(property -> {
- properties.setPropertyValue(property.getSchema().getName(),
- getPropertyValue(property.getSchema().getType(), property.getValues()));
- });
+ filter(property -> !CollectionUtils.isEmpty(property.getValues())).
+ forEach(property -> properties.setPropertyValue(
+ property.getSchema().getName(),
+ getPropertyValue(property.getSchema().getType(), property.getValues())));
// set pooling configuration (if supported) according to conninstance's
if (connInstance.getPoolConf() != null) {
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java
index fd4e98a..e76a7de 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/AbstractPropagationTaskExecutor.java
@@ -338,13 +338,7 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
return result;
}
- @Override
- public TaskExec execute(final PropagationTaskInfo taskInfo) {
- return execute(taskInfo, null);
- }
-
- @Override
- public TaskExec execute(final PropagationTaskInfo taskInfo, final PropagationReporter reporter) {
+ protected PropagationTask buildTask(final PropagationTaskInfo taskInfo) {
PropagationTask task;
if (taskInfo.getKey() == null) {
// double-checks that provided External Resource is valid, for further actions
@@ -371,6 +365,13 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
}
task.setAttributes(attributes);
+ return task;
+ }
+
+ @Override
+ public TaskExec execute(final PropagationTaskInfo taskInfo, final PropagationReporter reporter) {
+ PropagationTask task = buildTask(taskInfo);
+
Connector connector = taskInfo.getConnector() == null
? connFactory.getConnector(task.getResource())
: taskInfo.getConnector();
@@ -461,9 +462,7 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
propagationAttempted.set(true);
- actions.forEach(action -> {
- action.onError(task, execution, e);
- });
+ actions.forEach(action -> action.onError(task, execution, e));
} finally {
// Try to read remote object AFTER any actual operation
if (connector != null) {
@@ -505,13 +504,11 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
taskDAO.save(task);
}
- if (reporter != null) {
- reporter.onSuccessOrNonPriorityResourceFailures(taskInfo,
- ExecStatus.valueOf(execution.getStatus()),
- failureReason,
- beforeObj,
- afterObj);
- }
+ reporter.onSuccessOrNonPriorityResourceFailures(taskInfo,
+ ExecStatus.valueOf(execution.getStatus()),
+ failureReason,
+ beforeObj,
+ afterObj);
}
for (PropagationActions action : actions) {
@@ -560,6 +557,39 @@ public abstract class AbstractPropagationTaskExecutor implements PropagationTask
protected abstract void doExecute(
Collection<PropagationTaskInfo> taskInfos, PropagationReporter reporter, boolean nullPriorityAsync);
+ protected TaskExec rejected(
+ final PropagationTaskInfo taskInfo,
+ final String rejectReason,
+ final PropagationReporter reporter) {
+
+ PropagationTask task = buildTask(taskInfo);
+
+ TaskExec execution = entityFactory.newEntity(TaskExec.class);
+ execution.setStatus(ExecStatus.NOT_ATTEMPTED.name());
+
+ execution.setStart(new Date());
+ execution.setMessage(rejectReason);
+ execution.setEnd(execution.getStart());
+
+ if (hasToBeregistered(task, execution)) {
+ LOG.debug("Execution to be stored: {}", execution);
+
+ execution.setTask(task);
+ task.add(execution);
+
+ taskDAO.save(task);
+ }
+
+ reporter.onSuccessOrNonPriorityResourceFailures(
+ taskInfo,
+ ExecStatus.valueOf(execution.getStatus()),
+ rejectReason,
+ null,
+ null);
+
+ return execution;
+ }
+
@Override
public PropagationReporter execute(
final Collection<PropagationTaskInfo> taskInfos,
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
index 283e7dd..ecd4764 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
@@ -21,15 +21,10 @@ package org.apache.syncope.core.provisioning.java.propagation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.syncope.common.lib.types.ExecStatus;
@@ -80,26 +75,15 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec
final PropagationReporter reporter,
final boolean nullPriorityAsync) {
- List<PropagationTaskInfo> prioritizedTasks = new ArrayList<>();
-
- int[] connRequestTimeout = { 60 };
-
- taskInfos.stream().filter(task -> task.getExternalResource().getPropagationPriority() != null).forEach(task -> {
- prioritizedTasks.add(task);
-
- if (task.getExternalResource().getConnector().getConnRequestTimeout() != null
- && connRequestTimeout[0] < task.getExternalResource().getConnector().getConnRequestTimeout()) {
-
- connRequestTimeout[0] = task.getExternalResource().getConnector().getConnRequestTimeout();
- LOG.debug("Upgrade request connection timeout to {}", connRequestTimeout);
- }
- });
-
- prioritizedTasks.sort(Comparator.comparing(task -> task.getExternalResource().getPropagationPriority()));
+ List<PropagationTaskInfo> prioritizedTasks = taskInfos.stream().
+ filter(task -> task.getExternalResource().getPropagationPriority() != null).
+ sorted(Comparator.comparing(task -> task.getExternalResource().getPropagationPriority())).
+ collect(Collectors.toList());
LOG.debug("Propagation tasks sorted by priority, for serial execution: {}", prioritizedTasks);
- Set<PropagationTaskInfo> concurrentTasks = taskInfos.stream().
- filter(task -> !prioritizedTasks.contains(task)).collect(Collectors.toSet());
+ List<PropagationTaskInfo> concurrentTasks = taskInfos.stream().
+ filter(task -> !prioritizedTasks.contains(task)).
+ collect(Collectors.toList());
LOG.debug("Propagation tasks for concurrent execution: {}", concurrentTasks);
// first process priority resources sequentially and fail as soon as any propagation failure is reported
@@ -123,42 +107,33 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec
});
// then process non-priority resources concurrently...
- CompletionService<TaskExec> completionService = new ExecutorCompletionService<>(executor);
- Map<PropagationTaskInfo, Future<TaskExec>> nullPriority = new HashMap<>(concurrentTasks.size());
- concurrentTasks.forEach(taskInfo -> {
- try {
- nullPriority.put(
- taskInfo,
- completionService.submit(newPropagationTaskCallable(taskInfo, reporter)));
- } catch (Exception e) {
- LOG.error("Unexpected exception", e);
- }
- });
- // ...waiting for all callables to complete, if async processing was not required
- if (!nullPriority.isEmpty()) {
- if (nullPriorityAsync) {
- nullPriority.forEach((task, exec) -> {
- reporter.onSuccessOrNonPriorityResourceFailures(task, ExecStatus.CREATED, null, null, null);
- });
- } else {
- final Set<Future<TaskExec>> nullPriorityFutures = new HashSet<>(nullPriority.values());
+ if (!concurrentTasks.isEmpty()) {
+ CompletionService<TaskExec> completionService = new ExecutorCompletionService<>(executor);
+ List<Future<TaskExec>> futures = new ArrayList<>();
+
+ concurrentTasks.forEach(taskInfo -> {
try {
- executor.submit(() -> {
- while (!nullPriorityFutures.isEmpty()) {
- try {
- nullPriorityFutures.remove(completionService.take());
- } catch (Exception e) {
- LOG.error("Unexpected exception", e);
- }
- }
- }).get(connRequestTimeout[0], TimeUnit.SECONDS);
+ futures.add(completionService.submit(newPropagationTaskCallable(taskInfo, reporter)));
+
+ if (nullPriorityAsync) {
+ reporter.onSuccessOrNonPriorityResourceFailures(
+ taskInfo, ExecStatus.CREATED, null, null, null);
+ }
} catch (Exception e) {
- LOG.error("Unexpected exception", e);
- } finally {
- nullPriorityFutures.forEach(future -> future.cancel(true));
- nullPriorityFutures.clear();
- nullPriority.clear();
+ LOG.error("While submitting task for async execution", taskInfo, e);
+ rejected(taskInfo, e.getMessage(), reporter);
}
+ });
+
+ // ...waiting for all callables to complete, if async processing was not required
+ if (!nullPriorityAsync) {
+ futures.forEach(future -> {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.error("Unexpected exception", e);
+ }
+ });
}
}
}
diff --git a/core/provisioning-java/src/main/resources/provisioning.properties b/core/provisioning-java/src/main/resources/provisioning.properties
index 220105e..24f1ea5 100644
--- a/core/provisioning-java/src/main/resources/provisioning.properties
+++ b/core/provisioning-java/src/main/resources/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/core/provisioning-java/src/main/resources/provisioningContext.xml b/core/provisioning-java/src/main/resources/provisioningContext.xml
index bcf9a35..4aeca3c 100644
--- a/core/provisioning-java/src/main/resources/provisioningContext.xml
+++ b/core/provisioning-java/src/main/resources/provisioningContext.xml
@@ -33,7 +33,9 @@ under the License.
<!-- Used by AsyncConnectorFacade -->
<task:annotation-driven executor="asyncConnectorFacadeExecutor"/>
<task:executor id="asyncConnectorFacadeExecutor"
- pool-size="${asyncConnectorFacadeExecutor.poolSize}"/>
+ pool-size="${asyncConnectorFacadeExecutor.poolSize}"
+ queue-capacity="${asyncConnectorFacadeExecutor.queueCapacity}"
+ rejection-policy="ABORT"/>
<!-- Used by PriorityPropagationTaskExecutor -->
<task:executor id="propagationTaskExecutorAsyncExecutor"
@@ -41,7 +43,7 @@ under the License.
queue-capacity="${propagationTaskExecutorAsyncExecutor.queueCapacity}"
rejection-policy="ABORT"/>
<bean class="${propagationTaskExecutor}"/>
-
+
<bean class="${userProvisioningManager}"/>
<bean class="${groupProvisioningManager}"/>
<bean class="${anyObjectProvisioningManager}"/>
diff --git a/docker/core/src/main/resources/provisioning.properties.mariadb b/docker/core/src/main/resources/provisioning.properties.mariadb
index 8f17c5a..693243e 100644
--- a/docker/core/src/main/resources/provisioning.properties.mariadb
+++ b/docker/core/src/main/resources/provisioning.properties.mariadb
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.mssql b/docker/core/src/main/resources/provisioning.properties.mssql
index 3529791..4e47090 100644
--- a/docker/core/src/main/resources/provisioning.properties.mssql
+++ b/docker/core/src/main/resources/provisioning.properties.mssql
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.myjson b/docker/core/src/main/resources/provisioning.properties.myjson
index 191693c..c865f0c 100644
--- a/docker/core/src/main/resources/provisioning.properties.myjson
+++ b/docker/core/src/main/resources/provisioning.properties.myjson
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.mysql b/docker/core/src/main/resources/provisioning.properties.mysql
index 191693c..c865f0c 100644
--- a/docker/core/src/main/resources/provisioning.properties.mysql
+++ b/docker/core/src/main/resources/provisioning.properties.mysql
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.pgjsonb b/docker/core/src/main/resources/provisioning.properties.pgjsonb
index 220105e..24f1ea5 100644
--- a/docker/core/src/main/resources/provisioning.properties.pgjsonb
+++ b/docker/core/src/main/resources/provisioning.properties.pgjsonb
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/docker/core/src/main/resources/provisioning.properties.postgresql b/docker/core/src/main/resources/provisioning.properties.postgresql
index 220105e..24f1ea5 100644
--- a/docker/core/src/main/resources/provisioning.properties.postgresql
+++ b/docker/core/src/main/resources/provisioning.properties.postgresql
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/ext/camel/provisioning-camel/src/main/resources/provisioning.properties b/ext/camel/provisioning-camel/src/main/resources/provisioning.properties
index fd39626..1bef1b2 100644
--- a/ext/camel/provisioning-camel/src/main/resources/provisioning.properties
+++ b/ext/camel/provisioning-camel/src/main/resources/provisioning.properties
@@ -16,9 +16,9 @@
# under the License.
camel.directory=${conf.directory}
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/all/provisioning.properties b/fit/core-reference/src/main/resources/all/provisioning.properties
index 7d7f79c..fa10e90 100644
--- a/fit/core-reference/src/main/resources/all/provisioning.properties
+++ b/fit/core-reference/src/main/resources/all/provisioning.properties
@@ -16,9 +16,9 @@
# under the License.
camel.directory=${conf.directory}
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/mariadb/provisioning.properties b/fit/core-reference/src/main/resources/mariadb/provisioning.properties
index 8f17c5a..693243e 100644
--- a/fit/core-reference/src/main/resources/mariadb/provisioning.properties
+++ b/fit/core-reference/src/main/resources/mariadb/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/myjson/provisioning.properties b/fit/core-reference/src/main/resources/myjson/provisioning.properties
index 191693c..c865f0c 100644
--- a/fit/core-reference/src/main/resources/myjson/provisioning.properties
+++ b/fit/core-reference/src/main/resources/myjson/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/mysql/provisioning.properties b/fit/core-reference/src/main/resources/mysql/provisioning.properties
index 191693c..c865f0c 100644
--- a/fit/core-reference/src/main/resources/mysql/provisioning.properties
+++ b/fit/core-reference/src/main/resources/mysql/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/oracle/provisioning.properties b/fit/core-reference/src/main/resources/oracle/provisioning.properties
index a009d81..e62bd8a 100644
--- a/fit/core-reference/src/main/resources/oracle/provisioning.properties
+++ b/fit/core-reference/src/main/resources/oracle/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties b/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties
index 9347dd7..8d3c7ab 100644
--- a/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties
+++ b/fit/core-reference/src/main/resources/pgjsonb/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/postgres/provisioning.properties b/fit/core-reference/src/main/resources/postgres/provisioning.properties
index 9347dd7..8d3c7ab 100644
--- a/fit/core-reference/src/main/resources/postgres/provisioning.properties
+++ b/fit/core-reference/src/main/resources/postgres/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/provisioning.properties b/fit/core-reference/src/main/resources/provisioning.properties
index e75de15..25812f0 100644
--- a/fit/core-reference/src/main/resources/provisioning.properties
+++ b/fit/core-reference/src/main/resources/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor
diff --git a/fit/core-reference/src/main/resources/sqlserver/provisioning.properties b/fit/core-reference/src/main/resources/sqlserver/provisioning.properties
index 3529791..4e47090 100644
--- a/fit/core-reference/src/main/resources/sqlserver/provisioning.properties
+++ b/fit/core-reference/src/main/resources/sqlserver/provisioning.properties
@@ -14,9 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-asyncConnectorFacadeExecutor.poolSize=10
+asyncConnectorFacadeExecutor.poolSize=5-25
+asyncConnectorFacadeExecutor.queueCapacity=100
-# see http://docs.spring.io/spring/docs/current/spring-framework-reference/html/scheduling.html#scheduling-task-namespace-executor
propagationTaskExecutorAsyncExecutor.poolSize=5-25
propagationTaskExecutorAsyncExecutor.queueCapacity=100
propagationTaskExecutor=org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor