You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@syncope.apache.org by il...@apache.org on 2012/10/22 16:29:51 UTC

svn commit: r1400893 - in /incubator/syncope/trunk/core/src/main: java/org/apache/syncope/core/audit/ java/org/apache/syncope/core/notification/ java/org/apache/syncope/core/policy/ java/org/apache/syncope/core/propagation/ java/org/apache/syncope/core...

Author: ilgrosso
Date: Mon Oct 22 14:29:50 2012
New Revision: 1400893

URL: http://svn.apache.org/viewvc?rev=1400893&view=rev
Log:
[SYNCOPE-186] PropagationTaskExecutor (interface + default implementation) separated from PropagationManager

Added:
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java   (with props)
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java   (with props)
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java   (with props)
Modified:
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/audit/AuditManager.java
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/notification/NotificationManager.java
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/policy/AccountPolicyEnforcer.java
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationManager.java
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/TaskController.java
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/UserController.java
    incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/scheduling/SyncJob.java
    incubator/syncope/trunk/core/src/main/resources/syncopeContext.xml

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/audit/AuditManager.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/audit/AuditManager.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/audit/AuditManager.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/audit/AuditManager.java Mon Oct 22 14:29:50 2012
@@ -18,13 +18,13 @@
  */
 package org.apache.syncope.core.audit;
 
+import org.apache.syncope.types.AuditElements.Category;
+import org.apache.syncope.types.AuditElements.Result;
+import org.apache.syncope.types.AuditLoggerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.core.context.SecurityContext;
 import org.springframework.security.core.context.SecurityContextHolder;
-import org.apache.syncope.types.AuditLoggerName;
-import org.apache.syncope.types.AuditElements.Category;
-import org.apache.syncope.types.AuditElements.Result;
 
 public class AuditManager {
 

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/notification/NotificationManager.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/notification/NotificationManager.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/notification/NotificationManager.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/notification/NotificationManager.java Mon Oct 22 14:29:50 2012
@@ -211,10 +211,6 @@ public class NotificationManager {
         }
     }
 
-    public TaskExec execute(final NotificationTask task) {
-        return notificationJob.executeSingle(task);
-    }
-
     private String getRecipientEmail(
             final IntMappingType recipientAttrType, final String recipientAttrName, final SyncopeUser user) {
 

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/policy/AccountPolicyEnforcer.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/policy/AccountPolicyEnforcer.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/policy/AccountPolicyEnforcer.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/policy/AccountPolicyEnforcer.java Mon Oct 22 14:29:50 2012
@@ -22,17 +22,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 import org.apache.syncope.client.to.UserTO;
 import org.apache.syncope.core.persistence.beans.PropagationTask;
 import org.apache.syncope.core.persistence.beans.user.SyncopeUser;
 import org.apache.syncope.core.propagation.PropagationManager;
+import org.apache.syncope.core.propagation.PropagationTaskExecutor;
 import org.apache.syncope.core.rest.data.UserDataBinder;
 import org.apache.syncope.core.workflow.UserWorkflowAdapter;
 import org.apache.syncope.core.workflow.WorkflowResult;
 import org.apache.syncope.types.AccountPolicySpec;
 import org.apache.syncope.types.PolicyType;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 @Component
 public class AccountPolicyEnforcer extends PolicyEnforcer<AccountPolicySpec, SyncopeUser> {
@@ -44,6 +45,9 @@ public class AccountPolicyEnforcer exten
     private PropagationManager propagationManager;
 
     @Autowired
+    private PropagationTaskExecutor taskExecutor;
+
+    @Autowired
     private UserDataBinder userDataBinder;
 
     private static final Pattern PATTERN = Pattern.compile("[a-zA-Z0-9-_@. ]+");
@@ -116,11 +120,12 @@ public class AccountPolicyEnforcer exten
 
                 // propagate suspension if and only if it is required by policy
                 if (policy.isPropagateSuspension()) {
-                    final List<PropagationTask> tasks = propagationManager
-                            .getUpdateTaskIds(new WorkflowResult<Map.Entry<Long, Boolean>>(new DefaultMapEntry(updated
-                                    .getResult(), Boolean.FALSE), updated.getPropByRes(), updated.getPerformedTasks()));
+                    final List<PropagationTask> tasks = propagationManager.getUpdateTaskIds(
+                            new WorkflowResult<Map.Entry<Long, Boolean>>(
+                            new DefaultMapEntry(updated.getResult(), Boolean.FALSE),
+                            updated.getPropByRes(), updated.getPerformedTasks()));
 
-                    propagationManager.execute(tasks);
+                    taskExecutor.execute(tasks);
                 }
 
                 if (LOG.isDebugEnabled()) {

Added: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java?rev=1400893&view=auto
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java (added)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java Mon Oct 22 14:29:50 2012
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.core.propagation;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.syncope.core.init.ConnInstanceLoader;
+import org.apache.syncope.core.persistence.beans.PropagationTask;
+import org.apache.syncope.core.persistence.beans.TaskExec;
+import org.apache.syncope.core.persistence.beans.user.SyncopeUser;
+import org.apache.syncope.core.persistence.dao.TaskDAO;
+import org.apache.syncope.core.persistence.dao.UserDAO;
+import org.apache.syncope.core.util.NotFoundException;
+import org.apache.syncope.types.PropagationMode;
+import org.apache.syncope.types.PropagationTaskExecStatus;
+import org.apache.syncope.types.TraceLevel;
+import org.identityconnectors.framework.common.exceptions.ConnectorException;
+import org.identityconnectors.framework.common.objects.Attribute;
+import org.identityconnectors.framework.common.objects.AttributeBuilder;
+import org.identityconnectors.framework.common.objects.AttributeUtil;
+import org.identityconnectors.framework.common.objects.ConnectorObject;
+import org.identityconnectors.framework.common.objects.Name;
+import org.identityconnectors.framework.common.objects.ObjectClass;
+import org.identityconnectors.framework.common.objects.Uid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Transactional;
+
+@Transactional(rollbackFor = {Throwable.class})
+public abstract class AbstractPropagationTaskExecutor implements PropagationTaskExecutor {
+
+    /**
+     * Logger.
+     */
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractPropagationTaskExecutor.class);
+
+    /**
+     * Connector instance loader.
+     */
+    @Autowired
+    protected ConnInstanceLoader connLoader;
+
+    /**
+     * User DAO.
+     */
+    @Autowired
+    protected UserDAO userDAO;
+
+    /**
+     * Task DAO.
+     */
+    @Autowired
+    protected TaskDAO taskDAO;
+
+    @Override
+    public TaskExec execute(final PropagationTask task) {
+        return execute(task, null);
+    }
+
+    @Override
+    public TaskExec execute(final PropagationTask task, final PropagationHandler handler) {
+        final Date startDate = new Date();
+
+        TaskExec execution = new TaskExec();
+        execution.setStatus(PropagationTaskExecStatus.CREATED.name());
+
+        String taskExecutionMessage = null;
+
+        // Flag to state whether any propagation has been attempted
+        Set<String> propagationAttempted = new HashSet<String>();
+
+        ConnectorObject before = null;
+        ConnectorObject after = null;
+
+        try {
+            final ConnectorFacadeProxy connector = connLoader.getConnector(task.getResource());
+            if (connector == null) {
+                throw new NoSuchBeanDefinitionException(String.format(
+                        "Connector instance bean for resource %s not found", task.getResource()));
+            }
+
+            // Try to read user BEFORE any actual operation
+            before = getRemoteObject(connector, task, false);
+
+            try {
+                switch (task.getPropagationOperation()) {
+                    case CREATE:
+                    case UPDATE:
+                        // set of attributes to be propagated
+                        final Set<Attribute> attributes = new HashSet<Attribute>(task.getAttributes());
+
+                        if (before == null) {
+                            // 1. get accountId
+                            final String accountId = task.getAccountId();
+
+                            // 2. get name
+                            final Name name = (Name) AttributeUtil.find(Name.NAME, attributes);
+
+                            // 3. check if:
+                            //      * accountId is not blank;
+                            //      * accountId is not equal to Name.
+                            if (StringUtils.isNotBlank(accountId)
+                                    && (name == null || !accountId.equals(name.getNameValue()))) {
+
+                                // 3.a retrieve uid
+                                final Uid uid = (Uid) AttributeUtil.find(Uid.NAME, attributes);
+
+                                // 3.b add Uid if not provided
+                                if (uid == null) {
+                                    attributes.add(AttributeBuilder.build(Uid.NAME, Collections.singleton(accountId)));
+                                }
+                            }
+
+                            // 4. provision entry
+                            connector.create(task.getPropagationMode(), ObjectClass.ACCOUNT, attributes, null,
+                                    propagationAttempted);
+                        } else {
+
+                            // 1. check if rename is really required
+                            final Name newName = (Name) AttributeUtil.find(Name.NAME, attributes);
+
+                            LOG.debug("Rename required with value {}", newName);
+
+                            if (newName != null && newName.equals(before.getName())
+                                    && !before.getUid().getUidValue().equals(newName.getNameValue())) {
+
+                                LOG.debug("Remote object name unchanged");
+                                attributes.remove(newName);
+                            }
+
+                            LOG.debug("Attributes to be replaced {}", attributes);
+
+                            // 2. update with a new "normalized" attribute set
+                            connector.update(task.getPropagationMode(), ObjectClass.ACCOUNT, before.getUid(),
+                                    attributes, null, propagationAttempted);
+                        }
+                        break;
+
+                    case DELETE:
+                        if (before == null) {
+                            LOG.debug("{} not found on external resource: ignoring delete", task.getAccountId());
+                        } else {
+                            /*
+                             * We must choose here whether to
+                             *  a. actually delete the provided user from the external resource
+                             *  b. just update the provided user data onto the external resource
+                             *
+                             * (a) happens when either there is no user associated with the PropagationTask (this takes
+                             * place when the task is generated via UserController.delete()) or the provided updated
+                             * user hasn't the current resource assigned (when the task is generated via
+                             * UserController.update()).
+                             *
+                             * (b) happens when the provided updated user does have the current resource assigned
+                             * (when the task is generated via UserController.update()): this basically means that
+                             * before such update, this user used to have the current resource assigned by more than
+                             * one mean (for example, two different memberships with the same resource).
+                             */
+
+                            SyncopeUser user = null;
+                            if (task.getSyncopeUser() != null) {
+                                try {
+                                    user = getSyncopeUser(task.getSyncopeUser().getId());
+                                } catch (NotFoundException e) {
+                                    LOG.warn("Requesting to delete a non-existing user from {}",
+                                            task.getResource().getName(), e);
+                                }
+                            }
+
+                            if (user == null || !user.getResourceNames().contains(task.getResource().getName())) {
+                                LOG.debug("Perform deprovisioning on {}", task.getResource().getName());
+
+                                connector.delete(
+                                        task.getPropagationMode(),
+                                        ObjectClass.ACCOUNT,
+                                        before.getUid(),
+                                        null,
+                                        propagationAttempted);
+                            } else {
+                                LOG.debug("Update remote object on {}", task.getResource().getName());
+
+                                connector.update(
+                                        task.getPropagationMode(),
+                                        ObjectClass.ACCOUNT,
+                                        before.getUid(),
+                                        task.getAttributes(),
+                                        null,
+                                        propagationAttempted);
+                            }
+                        }
+
+                        break;
+
+                    default:
+                }
+
+                execution.setStatus(task.getPropagationMode() == PropagationMode.ONE_PHASE
+                        ? PropagationTaskExecStatus.SUCCESS.name()
+                        : PropagationTaskExecStatus.SUBMITTED.name());
+
+                LOG.debug("Successfully propagated to {}", task.getResource());
+
+                // Try to read user AFTER any actual operation
+                after = getRemoteObject(connector, task, true);
+            } catch (Exception e) {
+                after = getRemoteObject(connector, task, false);
+                throw e;
+            }
+        } catch (Exception e) {
+            LOG.error("Exception during provision on resource " + task.getResource().getName(), e);
+
+            if (e instanceof ConnectorException && e.getCause() != null) {
+                taskExecutionMessage = e.getCause().getMessage();
+            } else {
+                StringWriter exceptionWriter = new StringWriter();
+                exceptionWriter.write(e.getMessage() + "\n\n");
+                e.printStackTrace(new PrintWriter(exceptionWriter));
+                taskExecutionMessage = exceptionWriter.toString();
+            }
+
+            try {
+                execution.setStatus(task.getPropagationMode() == PropagationMode.ONE_PHASE
+                        ? PropagationTaskExecStatus.FAILURE.name()
+                        : PropagationTaskExecStatus.UNSUBMITTED.name());
+            } catch (Exception wft) {
+                LOG.error("While executing KO action on {}", execution, wft);
+            }
+
+            propagationAttempted.add(task.getPropagationOperation().name().toLowerCase());
+        } finally {
+            LOG.debug("Update execution for {}", task);
+
+            execution.setStartDate(startDate);
+            execution.setMessage(taskExecutionMessage);
+            execution.setEndDate(new Date());
+
+            if (hasToBeregistered(task, execution)) {
+                if (propagationAttempted.isEmpty()) {
+                    LOG.debug("No propagation attempted for {}", execution);
+                } else {
+                    execution.setTask(task);
+                    task.addExec(execution);
+
+                    LOG.debug("Execution finished: {}", execution);
+                }
+
+                taskDAO.save(task);
+
+                // This flush call is needed to generate a value for the execution id
+                // An alternative to this would be the following statement that might cause troubles with
+                // concurrent calls.
+                // taskExecDAO.findLatestStarted(task);
+                taskDAO.flush();
+            }
+        }
+
+        if (handler != null) {
+            handler.handle(
+                    task.getResource().getName(),
+                    PropagationTaskExecStatus.valueOf(execution.getStatus()),
+                    before,
+                    after);
+        }
+
+        return execution;
+    }
+
+    @Override
+    public void execute(final Collection<PropagationTask> tasks) throws PropagationException {
+        execute(tasks, null);
+    }
+
+    @Override
+    public abstract void execute(Collection<PropagationTask> tasks, PropagationHandler handler)
+            throws PropagationException;
+
+    protected SyncopeUser getSyncopeUser(final Long userId)
+            throws NotFoundException {
+
+        SyncopeUser user = userDAO.find(userId);
+        if (user == null) {
+            throw new NotFoundException("User " + userId);
+        }
+
+        return user;
+    }
+
+    /**
+     * Check whether an execution has to be stored, for a given task.
+     *
+     * @param task execution's task
+     * @param execution to be decide whether to store or not
+     * @return true if execution has to be store, false otherwise
+     */
+    protected boolean hasToBeregistered(final PropagationTask task, final TaskExec execution) {
+
+        boolean result;
+
+        final boolean failed = !PropagationTaskExecStatus.valueOf(execution.getStatus()).isSuccessful();
+
+        switch (task.getPropagationOperation()) {
+
+            case CREATE:
+                result = (failed && task.getResource().getCreateTraceLevel().ordinal() >= TraceLevel.FAILURES.ordinal())
+                        || task.getResource().getCreateTraceLevel() == TraceLevel.ALL;
+                break;
+
+            case UPDATE:
+                result = (failed && task.getResource().getUpdateTraceLevel().ordinal() >= TraceLevel.FAILURES.ordinal())
+                        || task.getResource().getUpdateTraceLevel() == TraceLevel.ALL;
+                break;
+
+            case DELETE:
+                result = (failed && task.getResource().getDeleteTraceLevel().ordinal() >= TraceLevel.FAILURES.ordinal())
+                        || task.getResource().getDeleteTraceLevel() == TraceLevel.ALL;
+                break;
+
+            default:
+                result = false;
+        }
+
+        return result;
+    }
+
+    /**
+     * Get remote object.
+     *
+     * @param connector connector facade proxy.
+     * @param task current propagation task.
+     * @param latest 'FALSE' to retrieve object using old accountId if not null.
+     * @return remote connector object.
+     */
+    protected ConnectorObject getRemoteObject(final ConnectorFacadeProxy connector, final PropagationTask task,
+            final boolean latest) {
+        try {
+
+            return connector.getObject(task.getPropagationMode(), task.getPropagationOperation(), ObjectClass.ACCOUNT,
+                    new Uid(latest || task.getOldAccountId() == null
+                    ? task.getAccountId()
+                    : task.getOldAccountId()), connector.getOperationOptions(task.getResource()));
+
+        } catch (RuntimeException ignore) {
+            LOG.debug("Resolving username", ignore);
+            return null;
+        }
+    }
+}

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/AbstractPropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java?rev=1400893&view=auto
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java (added)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java Mon Oct 22 14:29:50 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.core.propagation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.syncope.core.persistence.beans.PropagationTask;
+import org.apache.syncope.core.persistence.beans.TaskExec;
+import org.apache.syncope.types.PropagationTaskExecStatus;
+
+public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExecutor {
+
+    /**
+     * Sort the given collection by looking at related ExternalResource's priority, then execute.
+     */
+    @Override
+    public void execute(final Collection<PropagationTask> tasks, final PropagationHandler handler)
+            throws PropagationException {
+
+        final List<PropagationTask> prioritizedTasks = new ArrayList<PropagationTask>(tasks);
+        Collections.sort(prioritizedTasks, new PriorityComparator());
+
+        for (PropagationTask task : prioritizedTasks) {
+            LOG.debug("Execution started for {}", task);
+
+            TaskExec execution = execute(task, handler);
+
+            LOG.debug("Execution finished for {}, {}", task, execution);
+
+            // Propagation is interrupted as soon as the result of the
+            // communication with a primary resource is in error
+            PropagationTaskExecStatus execStatus;
+            try {
+                execStatus = PropagationTaskExecStatus.valueOf(execution.getStatus());
+            } catch (IllegalArgumentException e) {
+                LOG.error("Unexpected execution status found {}", execution.getStatus());
+                execStatus = PropagationTaskExecStatus.FAILURE;
+            }
+            if (task.getResource().isPropagationPrimary() && !execStatus.isSuccessful()) {
+                throw new PropagationException(task.getResource().getName(), execution.getMessage());
+            }
+        }
+    }
+
+    protected static class PriorityComparator implements Comparator<PropagationTask> {
+
+        @Override
+        public int compare(final PropagationTask task1, final PropagationTask task2) {
+            return task1.getResource().getPropagationPriority() > task2.getResource().getPropagationPriority()
+                    ? -1
+                    : task1.getResource().getPropagationPriority() == task2.getResource().getPropagationPriority()
+                    ? 0
+                    : 1;
+        }
+    }
+}

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PriorityPropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationManager.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationManager.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationManager.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationManager.java Mon Oct 22 14:29:50 2012
@@ -18,11 +18,8 @@
  */
 package org.apache.syncope.core.propagation;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,19 +31,16 @@ import org.apache.commons.jexl2.MapConte
 import org.apache.commons.lang.StringUtils;
 import org.apache.syncope.client.mod.AttributeMod;
 import org.apache.syncope.client.to.AttributeTO;
-import org.apache.syncope.core.init.ConnInstanceLoader;
 import org.apache.syncope.core.persistence.beans.AbstractAttrValue;
 import org.apache.syncope.core.persistence.beans.AbstractAttributable;
 import org.apache.syncope.core.persistence.beans.AbstractSchema;
 import org.apache.syncope.core.persistence.beans.ExternalResource;
 import org.apache.syncope.core.persistence.beans.PropagationTask;
 import org.apache.syncope.core.persistence.beans.SchemaMapping;
-import org.apache.syncope.core.persistence.beans.TaskExec;
 import org.apache.syncope.core.persistence.beans.membership.Membership;
 import org.apache.syncope.core.persistence.beans.user.SyncopeUser;
 import org.apache.syncope.core.persistence.dao.ResourceDAO;
 import org.apache.syncope.core.persistence.dao.SchemaDAO;
-import org.apache.syncope.core.persistence.dao.TaskDAO;
 import org.apache.syncope.core.persistence.dao.UserDAO;
 import org.apache.syncope.core.rest.data.UserDataBinder;
 import org.apache.syncope.core.util.AttributableUtil;
@@ -56,23 +50,15 @@ import org.apache.syncope.core.util.Sche
 import org.apache.syncope.core.workflow.WorkflowResult;
 import org.apache.syncope.types.AttributableType;
 import org.apache.syncope.types.IntMappingType;
-import org.apache.syncope.types.PropagationMode;
 import org.apache.syncope.types.PropagationOperation;
-import org.apache.syncope.types.PropagationTaskExecStatus;
 import org.apache.syncope.types.SchemaType;
-import org.apache.syncope.types.TraceLevel;
 import org.identityconnectors.framework.common.FrameworkUtil;
-import org.identityconnectors.framework.common.exceptions.ConnectorException;
 import org.identityconnectors.framework.common.objects.Attribute;
 import org.identityconnectors.framework.common.objects.AttributeBuilder;
 import org.identityconnectors.framework.common.objects.AttributeUtil;
-import org.identityconnectors.framework.common.objects.ConnectorObject;
 import org.identityconnectors.framework.common.objects.Name;
-import org.identityconnectors.framework.common.objects.ObjectClass;
-import org.identityconnectors.framework.common.objects.Uid;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -88,12 +74,6 @@ public class PropagationManager {
     protected static final Logger LOG = LoggerFactory.getLogger(PropagationManager.class);
 
     /**
-     * Connector instance loader.
-     */
-    @Autowired
-    private ConnInstanceLoader connLoader;
-
-    /**
      * User DataBinder.
      */
     @Autowired
@@ -118,18 +98,12 @@ public class PropagationManager {
     private SchemaDAO schemaDAO;
 
     /**
-     * Task DAO.
-     */
-    @Autowired
-    private TaskDAO taskDAO;
-
-    /**
      * JEXL engine for evaluating connector's account link.
      */
     @Autowired
     private JexlUtil jexlUtil;
 
-    private SyncopeUser getSyncopeUser(final Long userId)
+    protected SyncopeUser getSyncopeUser(final Long userId)
             throws NotFoundException {
 
         SyncopeUser user = userDAO.find(userId);
@@ -184,7 +158,7 @@ public class PropagationManager {
             propByRes.get(PropagationOperation.CREATE).removeAll(syncResourceNames);
         }
 
-        return provision(user, password, wfResult.getResult().getValue(), false, propByRes);
+        return createTasks(user, password, wfResult.getResult().getValue(), false, propByRes);
     }
 
     /**
@@ -286,7 +260,7 @@ public class PropagationManager {
             localPropByRes.get(PropagationOperation.DELETE).removeAll(syncResourceNames);
         }
 
-        return provision(user, password, enable, false, localPropByRes);
+        return createTasks(user, password, enable, false, localPropByRes);
     }
 
     /**
@@ -325,7 +299,7 @@ public class PropagationManager {
             propByRes.get(PropagationOperation.DELETE).remove(syncResourceName);
         }
 
-        return provision(user, null, false, true, propByRes);
+        return createTasks(user, null, false, true, propByRes);
     }
 
     /**
@@ -495,7 +469,7 @@ public class PropagationManager {
     }
 
     /**
-     * Implementation of the provisioning feature.
+     * Create propagation tasks.
      *
      * @param user user to be provisioned
      * @param password cleartext password to be provisioned
@@ -504,7 +478,7 @@ public class PropagationManager {
      * @param propByRes operation to be performed per resource
      * @return list of propagation tasks created
      */
-    protected List<PropagationTask> provision(final SyncopeUser user, final String password, final Boolean enable,
+    protected List<PropagationTask> createTasks(final SyncopeUser user, final String password, final Boolean enable,
             final boolean deleteOnResource, final PropagationByResource propByRes) {
 
         LOG.debug("Provisioning with user {}:\n{}", user, propByRes);
@@ -515,10 +489,12 @@ public class PropagationManager {
 
         final List<PropagationTask> tasks = new ArrayList<PropagationTask>();
 
-        final List<ExternalResource> allResByPriority = resourceDAO.findAllByPriority();
         for (PropagationOperation operation : PropagationOperation.values()) {
-            for (ExternalResource resource : allResByPriority) {
-                if (propByRes.get(operation).contains(resource.getName())) {
+            for (String resourceName : propByRes.get(operation)) {
+                final ExternalResource resource = resourceDAO.find(resourceName);
+                if (resource == null) {
+                    LOG.error("Invalid resource name specified: {}, ignoring...", resourceName);
+                } else {
                     PropagationTask task = new PropagationTask();
                     task.setResource(resource);
                     if (!deleteOnResource) {
@@ -542,325 +518,4 @@ public class PropagationManager {
 
         return tasks;
     }
-
-    public void execute(final List<PropagationTask> tasks)
-            throws PropagationException {
-        execute(tasks, null);
-    }
-
-    /**
-     * Execute a list of PropagationTask, in given order.
-     *
-     * @param tasks to be execute, in given order
-     * @param handler propagation handler
-     * @throws PropagationException if propagation goes wrong: propagation is interrupted as soon as the result of the
-     * communication with a primary resource is in error
-     */
-    public void execute(final List<PropagationTask> tasks, final PropagationHandler handler)
-            throws PropagationException {
-
-        for (PropagationTask task : tasks) {
-            LOG.debug("Execution started for {}", task);
-
-            TaskExec execution = execute(task, handler);
-
-            LOG.debug("Execution finished for {}, {}", task, execution);
-
-            // Propagation is interrupted as soon as the result of the
-            // communication with a primary resource is in error
-            PropagationTaskExecStatus execStatus;
-            try {
-                execStatus = PropagationTaskExecStatus.valueOf(execution.getStatus());
-            } catch (IllegalArgumentException e) {
-                LOG.error("Unexpected execution status found {}", execution.getStatus());
-                execStatus = PropagationTaskExecStatus.FAILURE;
-            }
-            if (task.getResource().isPropagationPrimary() && !execStatus.isSuccessful()) {
-                throw new PropagationException(task.getResource().getName(), execution.getMessage());
-            }
-        }
-    }
-
-    /**
-     * Check whether an execution has to be stored, for a given task.
-     *
-     * @param task execution's task
-     * @param execution to be decide whether to store or not
-     * @return true if execution has to be store, false otherwise
-     */
-    private boolean hasToBeregistered(final PropagationTask task, final TaskExec execution) {
-
-        boolean result;
-
-        final boolean failed = !PropagationTaskExecStatus.valueOf(execution.getStatus()).isSuccessful();
-
-        switch (task.getPropagationOperation()) {
-
-            case CREATE:
-                result = (failed && task.getResource().getCreateTraceLevel().ordinal() >= TraceLevel.FAILURES.ordinal())
-                        || task.getResource().getCreateTraceLevel() == TraceLevel.ALL;
-                break;
-
-            case UPDATE:
-                result = (failed && task.getResource().getUpdateTraceLevel().ordinal() >= TraceLevel.FAILURES.ordinal())
-                        || task.getResource().getUpdateTraceLevel() == TraceLevel.ALL;
-                break;
-
-            case DELETE:
-                result = (failed && task.getResource().getDeleteTraceLevel().ordinal() >= TraceLevel.FAILURES.ordinal())
-                        || task.getResource().getDeleteTraceLevel() == TraceLevel.ALL;
-                break;
-
-            default:
-                result = false;
-        }
-
-        return result;
-    }
-
-    /**
-     * Execute a propagation task.
-     *
-     * @param task to execute
-     * @return TaskExecution
-     */
-    public TaskExec execute(final PropagationTask task) {
-        return execute(task, null);
-    }
-
-    /**
-     * Execute a propagation task.
-     *
-     * @param task to execute.
-     * @param handler propagation handler.
-     * @return TaskExecution.
-     */
-    public TaskExec execute(final PropagationTask task, final PropagationHandler handler) {
-        final Date startDate = new Date();
-
-        TaskExec execution = new TaskExec();
-        execution.setStatus(PropagationTaskExecStatus.CREATED.name());
-
-        String taskExecutionMessage = null;
-
-        // Flag to state whether any propagation has been attempted
-        Set<String> propagationAttempted = new HashSet<String>();
-
-        ConnectorObject before = null;
-        ConnectorObject after = null;
-
-        try {
-            final ConnectorFacadeProxy connector = connLoader.getConnector(task.getResource());
-            if (connector == null) {
-                throw new NoSuchBeanDefinitionException(String.format(
-                        "Connector instance bean for resource %s not found", task.getResource()));
-            }
-
-            // Try to read user BEFORE any actual operation
-            before = getRemoteObject(connector, task, false);
-
-            try {
-                switch (task.getPropagationOperation()) {
-                    case CREATE:
-                    case UPDATE:
-                        // set of attributes to be propagated
-                        final Set<Attribute> attributes = new HashSet<Attribute>(task.getAttributes());
-
-                        if (before == null) {
-                            // 1. get accountId
-                            final String accountId = task.getAccountId();
-
-                            // 2. get name
-                            final Name name = (Name) AttributeUtil.find(Name.NAME, attributes);
-
-                            // 3. check if:
-                            //      * accountId is not blank;
-                            //      * accountId is not equal to Name.
-                            if (StringUtils.isNotBlank(accountId)
-                                    && (name == null || !accountId.equals(name.getNameValue()))) {
-
-                                // 3.a retrieve uid
-                                final Uid uid = (Uid) AttributeUtil.find(Uid.NAME, attributes);
-
-                                // 3.b add Uid if not provided
-                                if (uid == null) {
-                                    attributes.add(AttributeBuilder.build(Uid.NAME, Collections.singleton(accountId)));
-                                }
-                            }
-
-                            // 4. provision entry
-                            connector.create(task.getPropagationMode(), ObjectClass.ACCOUNT, attributes, null,
-                                    propagationAttempted);
-                        } else {
-
-                            // 1. check if rename is really required
-                            final Name newName = (Name) AttributeUtil.find(Name.NAME, attributes);
-
-                            LOG.debug("Rename required with value {}", newName);
-
-                            if (newName != null && newName.equals(before.getName())
-                                    && !before.getUid().getUidValue().equals(newName.getNameValue())) {
-
-                                LOG.debug("Remote object name unchanged");
-                                attributes.remove(newName);
-                            }
-
-                            LOG.debug("Attributes to be replaced {}", attributes);
-
-                            // 2. update with a new "normalized" attribute set
-                            connector.update(task.getPropagationMode(), ObjectClass.ACCOUNT, before.getUid(),
-                                    attributes, null, propagationAttempted);
-                        }
-                        break;
-
-                    case DELETE:
-                        if (before == null) {
-                            LOG.debug("{} not found on external resource: ignoring delete", task.getAccountId());
-                        } else {
-                            /*
-                             * We must choose here whether to
-                             *  a. actually delete the provided user from the external resource
-                             *  b. just update the provided user data onto the external resource
-                             *
-                             * (a) happens when either there is no user associated with the PropagationTask (this takes
-                             * place when the task is generated via UserController.delete()) or the provided updated
-                             * user hasn't the current resource assigned (when the task is generated via
-                             * UserController.update()).
-                             *
-                             * (b) happens when the provided updated user does have the current resource assigned
-                             * (when the task is generated via UserController.update()): this basically means that
-                             * before such update, this user used to have the current resource assigned by more than
-                             * one mean (for example, two different memberships with the same resource).
-                             */
-
-                            SyncopeUser user = null;
-                            if (task.getSyncopeUser() != null) {
-                                try {
-                                    user = getSyncopeUser(task.getSyncopeUser().getId());
-                                } catch (NotFoundException e) {
-                                    LOG.warn("Requesting to delete a non-existing user from {}",
-                                            task.getResource().getName(), e);
-                                }
-                            }
-
-                            if (user == null || !user.getResourceNames().contains(task.getResource().getName())) {
-                                LOG.debug("Perform deprovisioning on {}", task.getResource().getName());
-
-                                connector.delete(
-                                        task.getPropagationMode(),
-                                        ObjectClass.ACCOUNT,
-                                        before.getUid(),
-                                        null,
-                                        propagationAttempted);
-                            } else {
-                                LOG.debug("Update remote object on {}", task.getResource().getName());
-
-                                connector.update(
-                                        task.getPropagationMode(),
-                                        ObjectClass.ACCOUNT,
-                                        before.getUid(),
-                                        task.getAttributes(),
-                                        null,
-                                        propagationAttempted);
-                            }
-                        }
-
-                        break;
-
-                    default:
-                }
-
-                execution.setStatus(task.getPropagationMode() == PropagationMode.ONE_PHASE
-                        ? PropagationTaskExecStatus.SUCCESS.name()
-                        : PropagationTaskExecStatus.SUBMITTED.name());
-
-                LOG.debug("Successfully propagated to {}", task.getResource());
-
-                // Try to read user AFTER any actual operation
-                after = getRemoteObject(connector, task, true);
-            } catch (Exception e) {
-                after = getRemoteObject(connector, task, false);
-                throw e;
-            }
-        } catch (Exception e) {
-            LOG.error("Exception during provision on resource " + task.getResource().getName(), e);
-
-            if (e instanceof ConnectorException && e.getCause() != null) {
-                taskExecutionMessage = e.getCause().getMessage();
-            } else {
-                StringWriter exceptionWriter = new StringWriter();
-                exceptionWriter.write(e.getMessage() + "\n\n");
-                e.printStackTrace(new PrintWriter(exceptionWriter));
-                taskExecutionMessage = exceptionWriter.toString();
-            }
-
-            try {
-                execution.setStatus(task.getPropagationMode() == PropagationMode.ONE_PHASE
-                        ? PropagationTaskExecStatus.FAILURE.name()
-                        : PropagationTaskExecStatus.UNSUBMITTED.name());
-            } catch (Exception wft) {
-                LOG.error("While executing KO action on {}", execution, wft);
-            }
-
-            propagationAttempted.add(task.getPropagationOperation().name().toLowerCase());
-        } finally {
-            LOG.debug("Update execution for {}", task);
-
-            execution.setStartDate(startDate);
-            execution.setMessage(taskExecutionMessage);
-            execution.setEndDate(new Date());
-
-            if (hasToBeregistered(task, execution)) {
-                if (propagationAttempted.isEmpty()) {
-                    LOG.debug("No propagation attempted for {}", execution);
-                } else {
-                    execution.setTask(task);
-                    task.addExec(execution);
-
-                    LOG.debug("Execution finished: {}", execution);
-                }
-
-                taskDAO.save(task);
-
-                // This flush call is needed to generate a value for the execution id
-                // An alternative to this would be the following statement that might cause troubles with
-                // concurrent calls.
-                // taskExecDAO.findLatestStarted(task);
-                taskDAO.flush();
-            }
-        }
-
-        if (handler != null) {
-            handler.handle(
-                    task.getResource().getName(),
-                    PropagationTaskExecStatus.valueOf(execution.getStatus()),
-                    before,
-                    after);
-        }
-
-        return execution;
-    }
-
-    /**
-     * Get remote object.
-     *
-     * @param connector connector facade proxy.
-     * @param task current propagation task.
-     * @param latest 'FALSE' to retrieve object using old accountId if not null.
-     * @return remote connector object.
-     */
-    private ConnectorObject getRemoteObject(final ConnectorFacadeProxy connector, final PropagationTask task,
-            final boolean latest) {
-        try {
-
-            return connector.getObject(task.getPropagationMode(), task.getPropagationOperation(), ObjectClass.ACCOUNT,
-                    new Uid(latest || task.getOldAccountId() == null
-                    ? task.getAccountId()
-                    : task.getOldAccountId()), connector.getOperationOptions(task.getResource()));
-
-        } catch (RuntimeException ignore) {
-            LOG.debug("Resolving username", ignore);
-            return null;
-        }
-    }
 }

Added: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java?rev=1400893&view=auto
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java (added)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java Mon Oct 22 14:29:50 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.core.propagation;
+
+import java.util.Collection;
+import org.apache.syncope.core.persistence.beans.PropagationTask;
+import org.apache.syncope.core.persistence.beans.TaskExec;
+
+public interface PropagationTaskExecutor {
+
+    /**
+     * Execute the given PropagationTask and returns the generated TaskExec.
+     *
+     * @param task to be executed
+     * @return the generated TaskExec
+     */
+    TaskExec execute(PropagationTask task);
+
+    /**
+     * Execute the given PropagationTask, invoke the given handler and returns the generated TaskExec.
+     *
+     * @param task to be executed
+     * @param handler to be invoked
+     * @return the generated TaskExec
+     */
+    TaskExec execute(PropagationTask task, PropagationHandler handler);
+
+    /**
+     * Execute a collection of PropagationTask objects.
+     *
+     * @param tasks to be executed
+     * @throws PropagationException if propagation goes wrong: propagation is interrupted as soon as the result of the
+     * communication with a primary resource is in error
+     */
+    void execute(Collection<PropagationTask> tasks) throws PropagationException;
+
+    /**
+     * Execute a collection of PropagationTask objects and invoke the given handler on each of these.
+     *
+     * @param tasks to be execute, in given order
+     * @param handler propagation handler
+     * @throws PropagationException if propagation goes wrong: propagation is interrupted as soon as the result of the
+     * communication with a primary resource is in error
+     */
+    void execute(Collection<PropagationTask> tasks, PropagationHandler handler) throws PropagationException;
+}

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/propagation/PropagationTaskExecutor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/TaskController.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/TaskController.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/TaskController.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/TaskController.java Mon Oct 22 14:29:50 2012
@@ -32,7 +32,6 @@ import org.apache.syncope.client.validat
 import org.apache.syncope.core.audit.AuditManager;
 import org.apache.syncope.core.init.ImplementationClassNamesLoader;
 import org.apache.syncope.core.init.JobInstanceLoader;
-import org.apache.syncope.core.notification.NotificationManager;
 import org.apache.syncope.core.persistence.beans.NotificationTask;
 import org.apache.syncope.core.persistence.beans.PropagationTask;
 import org.apache.syncope.core.persistence.beans.SchedTask;
@@ -40,9 +39,10 @@ import org.apache.syncope.core.persisten
 import org.apache.syncope.core.persistence.beans.TaskExec;
 import org.apache.syncope.core.persistence.dao.TaskDAO;
 import org.apache.syncope.core.persistence.dao.TaskExecDAO;
-import org.apache.syncope.core.propagation.PropagationManager;
+import org.apache.syncope.core.propagation.PropagationTaskExecutor;
 import org.apache.syncope.core.rest.data.TaskDataBinder;
 import org.apache.syncope.core.scheduling.AbstractTaskJob;
+import org.apache.syncope.core.scheduling.NotificationJob;
 import org.apache.syncope.core.util.NotFoundException;
 import org.apache.syncope.core.util.TaskUtil;
 import org.apache.syncope.types.AuditElements.Category;
@@ -83,10 +83,10 @@ public class TaskController extends Abst
     private TaskDataBinder binder;
 
     @Autowired
-    private PropagationManager propagationManager;
+    private PropagationTaskExecutor taskExecutor;
 
     @Autowired
-    private NotificationManager notificationManager;
+    private NotificationJob notificationJob;
 
     @Autowired
     private JobInstanceLoader jobInstanceLoader;
@@ -306,12 +306,12 @@ public class TaskController extends Abst
         LOG.debug("Execution started for {}", task);
         switch (taskUtil) {
             case PROPAGATION:
-                final TaskExec propExec = propagationManager.execute((PropagationTask) task);
+                final TaskExec propExec = taskExecutor.execute((PropagationTask) task);
                 result = binder.getTaskExecTO(propExec);
                 break;
 
             case NOTIFICATION:
-                final TaskExec notExec = notificationManager.execute((NotificationTask) task);
+                final TaskExec notExec = notificationJob.executeSingle((NotificationTask) task);
                 result = binder.getTaskExecTO(notExec);
                 break;
 

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/UserController.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/UserController.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/UserController.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/rest/controller/UserController.java Mon Oct 22 14:29:50 2012
@@ -41,6 +41,7 @@ import org.apache.syncope.core.persisten
 import org.apache.syncope.core.propagation.PropagationException;
 import org.apache.syncope.core.propagation.PropagationHandler;
 import org.apache.syncope.core.propagation.PropagationManager;
+import org.apache.syncope.core.propagation.PropagationTaskExecutor;
 import org.apache.syncope.core.rest.data.UserDataBinder;
 import org.apache.syncope.core.util.ConnObjectUtil;
 import org.apache.syncope.core.util.EntitlementUtil;
@@ -101,6 +102,9 @@ public class UserController {
     private PropagationManager propagationManager;
 
     @Autowired
+    private PropagationTaskExecutor taskExecutor;
+
+    @Autowired
     private NotificationManager notificationManager;
 
     /**
@@ -292,7 +296,7 @@ public class UserController {
 
         final List<PropagationTO> propagations = new ArrayList<PropagationTO>();
 
-        propagationManager.execute(tasks, new PropagationHandler() {
+        taskExecutor.execute(tasks, new PropagationHandler() {
 
             @Override
             public void handle(final String resourceName, final PropagationTaskExecStatus executionStatus,
@@ -342,7 +346,7 @@ public class UserController {
 
         final List<PropagationTO> propagations = new ArrayList<PropagationTO>();
 
-        propagationManager.execute(tasks, new PropagationHandler() {
+        taskExecutor.execute(tasks, new PropagationHandler() {
 
             @Override
             public void handle(final String resourceName, final PropagationTaskExecStatus executionStatus,
@@ -502,7 +506,7 @@ public class UserController {
             throws NotFoundException, WorkflowException, PropagationException, UnauthorizedRoleException {
         LOG.debug("User delete called with {}", userId);
 
-        return deleteByUserId(userId);
+        return doDelete(userId);
     }
 
     @PreAuthorize("hasRole('USER_DELETE')")
@@ -514,7 +518,7 @@ public class UserController {
         UserTO result = userDataBinder.getUserTO(username);
         long userId = result.getId();
 
-        return deleteByUserId(userId);
+        return doDelete(userId);
     }
 
     @PreAuthorize("hasRole('USER_UPDATE')")
@@ -529,7 +533,7 @@ public class UserController {
         List<PropagationTask> tasks = propagationManager.getUpdateTaskIds(new WorkflowResult<Map.Entry<Long, Boolean>>(
                 new DefaultMapEntry(updated.getResult(), null), updated.getPropByRes(), updated.getPerformedTasks()));
 
-        propagationManager.execute(tasks);
+        taskExecutor.execute(tasks);
 
         notificationManager.createTasks(updated.getResult(), updated.getPerformedTasks());
 
@@ -599,7 +603,7 @@ public class UserController {
         List<PropagationTask> tasks = propagationManager.getUpdateTaskIds(new WorkflowResult<Map.Entry<Long, Boolean>>(
                 new DefaultMapEntry(updated.getResult().getKey(), Boolean.TRUE), updated.getPropByRes(), updated.
                 getPerformedTasks()), updated.getResult().getValue(), null, null);
-        propagationManager.execute(tasks);
+        taskExecutor.execute(tasks);
 
         final UserTO savedTO = userDataBinder.getUserTO(updated.getResult().getKey());
 
@@ -643,7 +647,7 @@ public class UserController {
 
         List<PropagationTask> tasks = propagationManager.getUpdateTaskIds(user, status, resources);
 
-        propagationManager.execute(tasks);
+        taskExecutor.execute(tasks);
         notificationManager.createTasks(updated.getResult(), updated.getPerformedTasks());
 
         final UserTO savedTO = userDataBinder.getUserTO(updated.getResult());
@@ -656,7 +660,7 @@ public class UserController {
         return savedTO;
     }
 
-    private UserTO deleteByUserId(final Long userId)
+    protected UserTO doDelete(final Long userId)
             throws NotFoundException, WorkflowException, PropagationException, UnauthorizedRoleException {
         // Note here that we can only notify about "delete", not any other
         // task defined in workflow process definition: this because this
@@ -670,7 +674,7 @@ public class UserController {
         final UserTO userTO = new UserTO();
         userTO.setId(userId);
 
-        propagationManager.execute(tasks, new PropagationHandler() {
+        taskExecutor.execute(tasks, new PropagationHandler() {
 
             @Override
             public void handle(final String resourceName, final PropagationTaskExecStatus executionStatus,

Modified: incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/scheduling/SyncJob.java
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/scheduling/SyncJob.java?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/scheduling/SyncJob.java (original)
+++ incubator/syncope/trunk/core/src/main/java/org/apache/syncope/core/scheduling/SyncJob.java Mon Oct 22 14:29:50 2012
@@ -47,6 +47,7 @@ import org.apache.syncope.core.persisten
 import org.apache.syncope.core.propagation.ConnectorFacadeProxy;
 import org.apache.syncope.core.propagation.PropagationException;
 import org.apache.syncope.core.propagation.PropagationManager;
+import org.apache.syncope.core.propagation.PropagationTaskExecutor;
 import org.apache.syncope.core.rest.controller.InvalidSearchConditionException;
 import org.apache.syncope.core.rest.controller.UnauthorizedRoleException;
 import org.apache.syncope.core.rest.data.UserDataBinder;
@@ -126,6 +127,12 @@ public class SyncJob extends AbstractTas
     private PropagationManager propagationManager;
 
     /**
+     * PropagationTask executor.
+     */
+    @Autowired
+    private PropagationTaskExecutor taskExecutor;
+
+    /**
      * User data binder.
      */
     @Autowired
@@ -322,7 +329,7 @@ public class SyncJob extends AbstractTas
                 List<PropagationTask> tasks = propagationManager.getCreateTaskIds(created, userTO.getPassword(), userTO
                         .getVirtualAttributes(), Collections.singleton(((SyncTask) this.task).getResource().getName()));
 
-                propagationManager.execute(tasks);
+                taskExecutor.execute(tasks);
 
                 notificationManager.createTasks(created.getResult().getKey(), created.getPerformedTasks());
 
@@ -379,7 +386,7 @@ public class SyncJob extends AbstractTas
                                 .getVirtualAttributesToBeUpdated(), Collections.singleton(((SyncTask) this.task)
                                 .getResource().getName()));
 
-                        propagationManager.execute(tasks);
+                        taskExecutor.execute(tasks);
 
                         notificationManager.createTasks(updated.getResult().getKey(), updated.getPerformedTasks());
 
@@ -432,10 +439,9 @@ public class SyncJob extends AbstractTas
                     try {
                         List<PropagationTask> tasks = propagationManager.getDeleteTaskIds(userId,
                                 ((SyncTask) this.task).getResource().getName());
-                        propagationManager.execute(tasks);
+                        taskExecutor.execute(tasks);
 
                         notificationManager.createTasks(userId, Collections.singleton("delete"));
-
                     } catch (Exception e) {
                         LOG.error("Could not propagate user " + userId, e);
                     }

Modified: incubator/syncope/trunk/core/src/main/resources/syncopeContext.xml
URL: http://svn.apache.org/viewvc/incubator/syncope/trunk/core/src/main/resources/syncopeContext.xml?rev=1400893&r1=1400892&r2=1400893&view=diff
==============================================================================
--- incubator/syncope/trunk/core/src/main/resources/syncopeContext.xml (original)
+++ incubator/syncope/trunk/core/src/main/resources/syncopeContext.xml Mon Oct 22 14:29:50 2012
@@ -16,7 +16,6 @@ software distributed under the License i
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
-
 -->
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context"
@@ -67,7 +66,10 @@ under the License.
   </bean>
   
   <bean id="propagationManager" class="org.apache.syncope.core.propagation.PropagationManager"/>
+  <bean id="propagationTaskExecutor" class="org.apache.syncope.core.propagation.PriorityPropagationTaskExecutor"/>
+    
   <bean id="notificationManager" class="org.apache.syncope.core.notification.NotificationManager"/>
+  
   <bean id="auditManager" class="org.apache.syncope.core.audit.AuditManager"/>
           
   <bean id="jexlEngine" class="org.apache.commons.jexl2.JexlEngine">