You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2015/08/25 13:41:37 UTC

svn commit: r1697652 - in /sling/trunk/contrib/extensions/distribution/core/src: main/java/org/apache/sling/distribution/agent/impl/ main/java/org/apache/sling/distribution/queue/impl/ main/java/org/apache/sling/distribution/queue/impl/jobhandling/ tes...

Author: mpetria
Date: Tue Aug 25 11:41:36 2015
New Revision: 1697652

URL: http://svn.apache.org/r1697652
Log:
SLING-3966: error queue strategy should be executed after import fails

Added:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java
Removed:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorAwareQueueDispatchingStrategy.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -41,6 +41,7 @@ import org.apache.sling.distribution.pac
 import org.apache.sling.distribution.packaging.impl.importer.RemoteDistributionPackageImporter;
 import org.apache.sling.distribution.queue.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
@@ -54,8 +55,10 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -126,6 +129,16 @@ public class ForwardDistributionAgentFac
             "These queues will gather all the packages until they are removed explicitly.")
     public static final String PASSIVE_QUEUES = "passiveQueues";
 
+    @Property(options = {
+            @PropertyOption(name = "none", value = "none"), @PropertyOption(name = "errorQueue", value = "errorQueue")},
+            value = "none",
+            label = "Retry Strategy", description = "The strategy to apply after a certain number of failed retries."
+    )
+    public static final String RETRY_STRATEGY = "retry.strategy";
+
+    @Property(intValue = 100, label = "Retry attempts", description = "The number of times to retry until the retry strategy is applied.")
+    public static final String RETRY_ATTEMPTS = "retry.attempts";
+
 
     @Property(name = "requestAuthorizationStrategy.target", label = "Request Authorization Strategy", description = "The target reference for the DistributionRequestAuthorizationStrategy used to authorize the access to distribution process," +
             "e.g. use target=(name=...) to bind to services by name.")
@@ -195,34 +208,50 @@ public class ForwardDistributionAgentFac
         passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
 
 
-
         DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);
         DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
 
-        DistributionQueueDispatchingStrategy dispatchingStrategy = null;
+        DistributionQueueDispatchingStrategy exportQueueStrategy = null;
+        DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
         DistributionPackageImporter packageImporter = null;
         Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(config.get(IMPORTER_ENDPOINTS));
         boolean useMultipleQueues = PropertiesUtil.toBoolean(config.get(USE_MULTIPLE_QUEUES), false);
+        Set<String> processingQueues = new HashSet<String>();
+
 
         if (useMultipleQueues) {
             Set<String> queuesMap = new TreeSet<String>();
             queuesMap.addAll(importerEndpointsMap.keySet());
             queuesMap.addAll(Arrays.asList(passiveQueues));
 
+            processingQueues.addAll(importerEndpointsMap.keySet());
+            processingQueues.removeAll(Arrays.asList(passiveQueues));
+
             String[] queueNames = queuesMap.toArray(new String[0]);
-            dispatchingStrategy = new MultipleQueueDispatchingStrategy(queueNames);
+            exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
             packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
         } else {
-            dispatchingStrategy = new SingleQueueDispatchingStrategy();
+            exportQueueStrategy = new SingleQueueDispatchingStrategy();
+            processingQueues.addAll(exportQueueStrategy.getQueueNames());
             packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
         }
 
         DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.ADD, DistributionRequestType.DELETE };
 
 
-        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+        String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
+        int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);
+
+
+        if ("errorQueue".equals(retryStrategy)) {
+            importQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[0]));
+        }
+
+
+        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
                 serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
-                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots);
+                queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots, retryAttepts);
 
 
     }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -49,7 +49,10 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * An OSGi service factory for {@link org.apache.sling.distribution.agent.DistributionAgent}s which references already existing OSGi services.
@@ -155,13 +158,16 @@ public class QueueDistributionAgentFacto
         allowedRoots = SettingsUtils.removeEmptyEntries(allowedRoots);
 
         DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
-        DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+        DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
+        DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
         DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);
         DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.ADD, DistributionRequestType.DELETE };
+        Set<String> processingQueues = new HashSet<String>();
+        processingQueues.addAll(exportQueueStrategy.getQueueNames());
 
-
-        return new SimpleDistributionAgent(agentName, false, null,
+        return new SimpleDistributionAgent(agentName, false, processingQueues,
                 serviceName, null, packageExporter, requestAuthorizationStrategy,
-                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots);
+                queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots, 0);
     }
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -53,7 +53,11 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * An OSGi service factory for {@link org.apache.sling.distribution.agent.DistributionAgent}s which references already existing OSGi services.
@@ -100,10 +104,6 @@ public class ReverseDistributionAgentFac
     @Property(boolValue = true, label = "Queue Processing Enabled", description = "Whether or not the distribution agent should process packages in the queues.")
     public static final String QUEUE_PROCESSING_ENABLED = "queue.processing.enabled";
 
-    @Property(cardinality = 100, label = "Passive queues", description = "List of queues that should be disabled." +
-            "These queues will gather all the packages until they are removed explicitly.")
-    public static final String PASSIVE_QUEUES = "passiveQueues";
-
     /**
      * endpoints property
      */
@@ -178,9 +178,6 @@ public class ReverseDistributionAgentFac
         String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
         boolean queueProcessingEnabled = PropertiesUtil.toBoolean(config.get(QUEUE_PROCESSING_ENABLED), true);
 
-        String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
-        passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
-
 
         String[] exporterEndpoints = PropertiesUtil.toStringArray(config.get(EXPORTER_ENDPOINTS), new String[0]);
         exporterEndpoints = SettingsUtils.removeEmptyEntries(exporterEndpoints);
@@ -194,13 +191,18 @@ public class ReverseDistributionAgentFac
         DistributionPackageImporter packageImporter = new LocalDistributionPackageImporter(packageBuilder);
         DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
 
-        DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+        DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
+        DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
         DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.PULL };
+        Set<String> processingQueues = new HashSet<String>();
+        processingQueues.addAll(exportQueueStrategy.getQueueNames());
+
 
 
-        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
                 serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
-                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null);
+                queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null, 0);
 
 
     }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Tue Aug 25 11:41:36 2015
@@ -22,11 +22,11 @@ import javax.annotation.Nonnull;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
@@ -80,7 +80,9 @@ public class SimpleDistributionAgent imp
     private final DistributionPackageImporter distributionPackageImporter;
     private final DistributionPackageExporter distributionPackageExporter;
 
-    private final DistributionQueueDispatchingStrategy queueDistributionStrategy;
+    private final DistributionQueueDispatchingStrategy exportQueueStrategy;
+    private final DistributionQueueDispatchingStrategy importQueueStrategy;
+
 
     private final DistributionEventFactory distributionEventFactory;
 
@@ -91,29 +93,33 @@ public class SimpleDistributionAgent imp
     private final String subServiceName;
     private AgentBasedRequestHandler agentBasedRequestHandler;
     private boolean active = false;
+    private final Set<String> processingQueues;
+    private final int retryAttempts;
     private final DefaultDistributionLog log;
     private final DistributionRequestType[] allowedRequests;
     private final String[] allowedRoots;
-    private final String[] passiveQueues;
 
     public SimpleDistributionAgent(String name,
                                    boolean queueProcessingEnabled,
-                                   String[] passiveQueues,
+                                   Set<String> processingQueues,
                                    String subServiceName,
                                    DistributionPackageImporter distributionPackageImporter,
                                    DistributionPackageExporter distributionPackageExporter,
                                    DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy,
                                    DistributionQueueProvider queueProvider,
-                                   DistributionQueueDispatchingStrategy queueDistributionStrategy,
+                                   DistributionQueueDispatchingStrategy exportQueueStrategy,
+                                   DistributionQueueDispatchingStrategy importQueueStrategy,
                                    DistributionEventFactory distributionEventFactory,
                                    ResourceResolverFactory resourceResolverFactory,
                                    DefaultDistributionLog log,
                                    DistributionRequestType[] allowedRequests,
-                                   String[] allowedRoots) {
+                                   String[] allowedRoots,
+                                   int retryAttempts) {
         this.log = log;
         this.allowedRequests = allowedRequests;
         this.allowedRoots = allowedRoots;
-        this.passiveQueues = passiveQueues;
+        this.processingQueues = processingQueues;
+        this.retryAttempts = retryAttempts;
 
         // check configuration is valid
         if (name == null
@@ -122,7 +128,7 @@ public class SimpleDistributionAgent imp
                 || subServiceName == null
                 || distributionRequestAuthorizationStrategy == null
                 || queueProvider == null
-                || queueDistributionStrategy == null
+                || exportQueueStrategy == null
                 || distributionEventFactory == null
                 || resourceResolverFactory == null) {
 
@@ -132,7 +138,7 @@ public class SimpleDistributionAgent imp
                     subServiceName,
                     distributionRequestAuthorizationStrategy,
                     queueProvider,
-                    queueDistributionStrategy,
+                    exportQueueStrategy,
                     distributionEventFactory,
                     resourceResolverFactory});
             throw new IllegalArgumentException("all arguments are required: " + errorMessage);
@@ -146,7 +152,8 @@ public class SimpleDistributionAgent imp
         this.distributionPackageImporter = distributionPackageImporter;
         this.distributionPackageExporter = distributionPackageExporter;
         this.queueProvider = queueProvider;
-        this.queueDistributionStrategy = queueDistributionStrategy;
+        this.exportQueueStrategy = exportQueueStrategy;
+        this.importQueueStrategy = importQueueStrategy;
         this.distributionEventFactory = distributionEventFactory;
     }
 
@@ -227,7 +234,7 @@ public class SimpleDistributionAgent imp
 
         // dispatch the distribution package to the queue distribution handler
         try {
-            Iterable<DistributionQueueItemStatus> states = queueDistributionStrategy.add(distributionPackage, queueProvider);
+            Iterable<DistributionQueueItemStatus> states = exportQueueStrategy.add(distributionPackage, queueProvider);
             for (DistributionQueueItemStatus state : states) {
                 DistributionRequestState requestState = getRequestStateFromQueueState(state.getItemState());
                 distributionResponses.add(new SimpleDistributionResponse(requestState, state.getItemState().toString()));
@@ -249,7 +256,12 @@ public class SimpleDistributionAgent imp
 
     @Nonnull
     public Iterable<String> getQueueNames() {
-        return queueDistributionStrategy.getQueueNames();
+        Set<String> queueNames = new TreeSet<String>();
+        queueNames.addAll(exportQueueStrategy.getQueueNames());
+        if (importQueueStrategy != null) {
+            queueNames.addAll(importQueueStrategy.getQueueNames());
+        }
+        return queueNames;
     }
 
     public DistributionQueue getQueue(@Nonnull String queueName) throws DistributionAgentException {
@@ -313,15 +325,7 @@ public class SimpleDistributionAgent imp
 
         if (!isPassive()) {
             try {
-
-                Set<String> allQueues = new HashSet<String>(queueDistributionStrategy.getQueueNames());
-
-                if (passiveQueues != null) {
-                    Set<String> passiveQueues = new HashSet<String>(Arrays.asList(this.passiveQueues));
-                    allQueues.removeAll(passiveQueues);
-                }
-
-                queueProvider.enableQueueProcessing(new PackageQueueProcessor(), allQueues.toArray(new String[0]));
+                queueProvider.enableQueueProcessing(new PackageQueueProcessor(), processingQueues.toArray(new String[0]));
             } catch (DistributionQueueException e) {
                 log.error("cannot enable queue processing", e);
             }
@@ -373,10 +377,12 @@ public class SimpleDistributionAgent imp
 
     }
 
-    private boolean processQueueItem(String queueName, DistributionQueueItem queueItem) {
+    private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) {
         boolean success = false;
         ResourceResolver agentResourceResolver = null;
         DistributionPackage distributionPackage = null;
+        DistributionQueueItem queueItem = queueEntry.getItem();
+        DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
         try {
 
             agentResourceResolver = getAgentResourceResolver();
@@ -387,21 +393,20 @@ public class SimpleDistributionAgent imp
                 distributionPackage.getInfo().putAll(queueItem);
                 distributionPackage.getInfo().put(DistributionPackageInfo.PROPERTY_ORIGIN_QUEUE, queueName);
 
-
-                distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);
-
-                DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
-
-                generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage);
-                success = true;
-                log.info("distribution package {} was delivered", queueItem.getId());
+                if (processPackage(agentResourceResolver, distributionPackage)) {
+                    success = true;
+                    DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+                    generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage);
+                } else if (importQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
+                    success = true;
+                    reEnqueuePackage(agentResourceResolver, distributionPackage);
+                    DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+                }
             } else {
                 success = true; // return success if package does not exist in order to clear the queue.
                 log.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getId());
             }
 
-        } catch (DistributionPackageImportException e) {
-            log.error("could not deliver package {}", queueItem.getId(), e);
         } catch (LoginException e) {
             log.info("cannot obtain resource resolver", e);
         } finally {
@@ -413,6 +418,35 @@ public class SimpleDistributionAgent imp
         return success;
     }
 
+    private boolean processPackage(ResourceResolver resourceResolver, DistributionPackage distributionPackage) {
+        try {
+            distributionPackageImporter.importPackage(resourceResolver, distributionPackage);
+        } catch (DistributionPackageImportException e) {
+            log.error("could not deliver package {}", distributionPackage.getId(), e);
+            return false;
+        }
+
+        log.info("distribution package {} was delivered", distributionPackage.getId());
+        return true;
+    }
+
+    private boolean reEnqueuePackage(ResourceResolver resourceResolver, DistributionPackage distributionPackage) {
+
+        if (importQueueStrategy == null) {
+            return false;
+        }
+
+        try {
+            importQueueStrategy.add(distributionPackage, queueProvider);
+        } catch (DistributionQueueException e) {
+            log.error("could not reenqueue package {}", distributionPackage.getId(), e);
+            return false;
+        }
+
+        log.info("distribution package {} was reenqueued", distributionPackage.getId());
+        return true;
+    }
+
     private ResourceResolver getAgentResourceResolver() throws LoginException {
         ResourceResolver resourceResolver;
 
@@ -495,7 +529,7 @@ public class SimpleDistributionAgent imp
             try {
                 log.debug("queue {} processing item {}", queueName, queueItem);
 
-                boolean success = processQueueItem(queueName, queueItem);
+                boolean success = processQueueItem(queueName, queueEntry);
 
                 log.debug("queue {} processing item {} ended with status {}", queueName, queueItem, success);
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -18,7 +18,11 @@
  */
 package org.apache.sling.distribution.agent.impl;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -94,10 +98,6 @@ public class SimpleDistributionAgentFact
     @Property(boolValue = true, label = "Queue Processing Enabled", description = "Whether or not the distribution agent should process packages in the queues.")
     public static final String QUEUE_PROCESSING_ENABLED = "queue.processing.enabled";
 
-    @Property(cardinality = 100, label = "Passive queues", description = "List of queues that should be disabled." +
-            "These queues will gather all the packages until they are removed explicitly.")
-    public static final String PASSIVE_QUEUES = "passiveQueues";
-
 
     @Property(name = "packageExporter.target", label = "Exporter", description = "The target reference for the DistributionPackageExporter used to receive (export) the distribution packages," +
             "e.g. use target=(name=...) to bind to services by name.")
@@ -160,14 +160,18 @@ public class SimpleDistributionAgentFact
         String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
 
         boolean queueProcessingEnabled = PropertiesUtil.toBoolean(config.get(QUEUE_PROCESSING_ENABLED), true);
-        String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
-        passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
 
         DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
-        DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
-        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+        DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
+        DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
+        Set<String> processingQueues = new HashSet<String>();
+        processingQueues.addAll(exportQueueStrategy.getQueueNames());
+
+
+        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
                 serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
-                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, null, null);
+                queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, null, null, 0);
 
     }
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -41,6 +41,7 @@ import org.apache.sling.distribution.pac
 import org.apache.sling.distribution.packaging.impl.importer.RemoteDistributionPackageImporter;
 import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
@@ -54,8 +55,10 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -129,6 +132,17 @@ public class SyncDistributionAgentFactor
     public static final String USE_MULTIPLE_QUEUES = "useMultipleQueues";
 
 
+    @Property(options = {
+            @PropertyOption(name = "none", value = "none"), @PropertyOption(name = "errorQueue", value = "errorQueue")},
+            value = "none",
+            label = "Retry Strategy", description = "The strategy to apply after a certain number of failed retries."
+    )
+    public static final String RETRY_STRATEGY = "retry.strategy";
+
+    @Property(intValue = 100, label = "Retry attempts", description = "The number of times to retry until the retry strategy is applied.")
+    public static final String RETRY_ATTEMPTS = "retry.attempts";
+
+
     /**
      * no. of items to poll property
      */
@@ -200,8 +214,6 @@ public class SyncDistributionAgentFactor
         String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
         passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
 
-
-
         Object exporterEndpointsValue = config.get(EXPORTER_ENDPOINTS);
         Object importerEndpointsValue = config.get(IMPORTER_ENDPOINTS);
 
@@ -215,19 +227,24 @@ public class SyncDistributionAgentFactor
         int pullItems = PropertiesUtil.toInteger(config.get(PULL_ITEMS), Integer.MAX_VALUE);
 
 
-        DistributionQueueDispatchingStrategy dispatchingStrategy;
+        DistributionQueueDispatchingStrategy exportQueueStrategy;
+        DistributionQueueDispatchingStrategy importQueueStrategy = null;
         DistributionPackageImporter packageImporter;
+        Set<String> processingQueues = new HashSet<String>();
 
         if (useMultipleQueues) {
             Set<String> queuesMap = new TreeSet<String>();
             queuesMap.addAll(importerEndpointsMap.keySet());
             queuesMap.addAll(Arrays.asList(passiveQueues));
+            processingQueues.addAll(importerEndpointsMap.keySet());
+            processingQueues.removeAll(Arrays.asList(passiveQueues));
 
             String[] queueNames = queuesMap.toArray(new String[0]);
-            dispatchingStrategy = new MultipleQueueDispatchingStrategy(queueNames);
+            exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
             packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
         } else {
-            dispatchingStrategy = new SingleQueueDispatchingStrategy();
+            exportQueueStrategy = new SingleQueueDispatchingStrategy();
+            processingQueues.addAll(exportQueueStrategy.getQueueNames());
             packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
         }
 
@@ -235,9 +252,18 @@ public class SyncDistributionAgentFactor
         DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
         DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.PULL };
 
-        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+        String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
+        int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);
+
+
+        if ("errorQueue".equals(retryStrategy)) {
+            importQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[0]));
+        }
+
+
+        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
                 serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
-                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null);
+                queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null, retryAttepts);
 
     }
 }

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java?rev=1697652&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java Tue Aug 25 11:41:36 2015
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl;
+
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueException;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * The error strategy for delivering packages to queues. The strategy delivers the packages in a queue named error-queueName
+ * where the queueName is the name of the original queue the package was in.
+ */
+public class ErrorQueueDispatchingStrategy implements DistributionQueueDispatchingStrategy {
+
+    private final static String ERROR_PREFIX = "error-";
+    private final Set<String> queueNames = new TreeSet<String>();
+
+    public ErrorQueueDispatchingStrategy(String[] queueNames) {
+
+        this.queueNames.addAll(Arrays.asList(queueNames));
+    }
+
+    @Override
+    public Iterable<DistributionQueueItemStatus> add(@Nonnull DistributionPackage distributionPackage, @Nonnull DistributionQueueProvider queueProvider) throws DistributionQueueException {
+
+        List<DistributionQueueItemStatus> result = new ArrayList<DistributionQueueItemStatus>();
+        String originQueue = distributionPackage.getInfo().getQueue();
+
+        if (!queueNames.contains(originQueue)) {
+            return result;
+        }
+
+        String errorQueueName = ERROR_PREFIX + originQueue;
+
+        DistributionQueue errorQueue = queueProvider.getQueue(errorQueueName);
+
+        DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, errorQueueName);
+
+        DistributionQueueItem queueItem = DistributionPackageUtils.toQueueItem(distributionPackage);
+
+        if (errorQueue.add(queueItem)) {
+            DistributionPackageUtils.acquire(distributionPackage, errorQueueName);
+
+            DistributionQueueEntry entry = errorQueue.getItem(queueItem.getId());
+            status = entry.getStatus();
+        }
+
+        result.add(status);
+
+        return result;
+    }
+
+    @Nonnull
+    @Override
+    public List<String> getQueueNames() {
+        List<String> errorQueueNames = new ArrayList<String>();
+        for (String queueName : queueNames) {
+            errorQueueNames.add(ERROR_PREFIX + queueName);
+        }
+        return errorQueueNames;
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java Tue Aug 25 11:41:36 2015
@@ -195,6 +195,10 @@ public class JobHandlingDistributionQueu
         DistributionQueueItemStatus firstItemStatus = firstJob != null ? JobHandlingUtils.getStatus(firstJob) : null;
 
         DistributionQueueState state = DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
+        if (!isActive) {
+            state = DistributionQueueState.PAUSED;
+        }
+
         int itemsCount = jobs.size();
 
         return new DistributionQueueStatus(itemsCount, state);

Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java Tue Aug 25 11:41:36 2015
@@ -71,8 +71,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, distributionHandler,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+                queueProvider, distributionHandler, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
         DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
         DistributionPackage distributionPackage = mock(DistributionPackage.class);
         ResourceResolver resourceResolver = mock(ResourceResolver.class);
@@ -102,7 +102,7 @@ public class SimpleDistributionAgentTest
                 false, null, "subServiceName", packageImporter,
                 packageExporter, packageExporterStrategy,
                 queueProvider,
-                distributionHandler, distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+                distributionHandler, null, distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
         DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
         DistributionPackage distributionPackage = mock(DistributionPackage.class);
         ResourceResolver resourceResolver = mock(ResourceResolver.class);
@@ -135,8 +135,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, distributionHandler,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+                queueProvider, distributionHandler, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
         DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
         DistributionPackage distributionPackage = mock(DistributionPackage.class);
         DistributionPackageInfo packageInfo = new DistributionPackageInfo("type");
@@ -165,8 +165,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, distributionHandler,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+                queueProvider, distributionHandler, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
         DistributionQueue queue = mock(DistributionQueue.class);
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME))
                 .thenReturn(queue);
@@ -188,8 +188,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, distributionHandler,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+                queueProvider, distributionHandler, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
         DistributionQueue queue = mock(DistributionQueue.class);
         when(queueProvider.getQueue("priority")).thenReturn(queue);
         assertNotNull(agent.getQueue("priority"));
@@ -210,8 +210,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, distributionHandler,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+                queueProvider, distributionHandler, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
         DistributionQueue queue = mock(DistributionQueue.class);
         when(queueProvider.getQueue("priority")).thenReturn(queue);
         assertNull(agent.getQueue("weird"));
@@ -238,8 +238,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, queueDistributionStrategy,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" });
+                queueProvider, queueDistributionStrategy, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" }, 0);
 
         DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/content");
         DistributionPackage distributionPackage = mock(DistributionPackage.class);
@@ -277,8 +277,8 @@ public class SimpleDistributionAgentTest
         SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
                 false, null, "serviceName", packageImporter,
                 packageExporter, packageExporterStrategy,
-                queueProvider, queueDistributionStrategy,
-                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" });
+                queueProvider, queueDistributionStrategy, null,
+                distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" }, 0);
 
         DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/home");
         DistributionPackage distributionPackage = mock(DistributionPackage.class);