You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2015/08/25 13:41:37 UTC
svn commit: r1697652 - in
/sling/trunk/contrib/extensions/distribution/core/src:
main/java/org/apache/sling/distribution/agent/impl/
main/java/org/apache/sling/distribution/queue/impl/
main/java/org/apache/sling/distribution/queue/impl/jobhandling/ tes...
Author: mpetria
Date: Tue Aug 25 11:41:36 2015
New Revision: 1697652
URL: http://svn.apache.org/r1697652
Log:
SLING-3966: error queue strategy should be executed after import fails
Added:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java
Removed:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorAwareQueueDispatchingStrategy.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -41,6 +41,7 @@ import org.apache.sling.distribution.pac
import org.apache.sling.distribution.packaging.impl.importer.RemoteDistributionPackageImporter;
import org.apache.sling.distribution.queue.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
@@ -54,8 +55,10 @@ import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -126,6 +129,16 @@ public class ForwardDistributionAgentFac
"These queues will gather all the packages until they are removed explicitly.")
public static final String PASSIVE_QUEUES = "passiveQueues";
+ @Property(options = {
+ @PropertyOption(name = "none", value = "none"), @PropertyOption(name = "errorQueue", value = "errorQueue")},
+ value = "none",
+ label = "Retry Strategy", description = "The strategy to apply after a certain number of failed retries."
+ )
+ public static final String RETRY_STRATEGY = "retry.strategy";
+
+ @Property(intValue = 100, label = "Retry attempts", description = "The number of times to retry until the retry strategy is applied.")
+ public static final String RETRY_ATTEMPTS = "retry.attempts";
+
@Property(name = "requestAuthorizationStrategy.target", label = "Request Authorization Strategy", description = "The target reference for the DistributionRequestAuthorizationStrategy used to authorize the access to distribution process," +
"e.g. use target=(name=...) to bind to services by name.")
@@ -195,34 +208,50 @@ public class ForwardDistributionAgentFac
passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
-
DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);
DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
- DistributionQueueDispatchingStrategy dispatchingStrategy = null;
+ DistributionQueueDispatchingStrategy exportQueueStrategy = null;
+ DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
DistributionPackageImporter packageImporter = null;
Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(config.get(IMPORTER_ENDPOINTS));
boolean useMultipleQueues = PropertiesUtil.toBoolean(config.get(USE_MULTIPLE_QUEUES), false);
+ Set<String> processingQueues = new HashSet<String>();
+
if (useMultipleQueues) {
Set<String> queuesMap = new TreeSet<String>();
queuesMap.addAll(importerEndpointsMap.keySet());
queuesMap.addAll(Arrays.asList(passiveQueues));
+ processingQueues.addAll(importerEndpointsMap.keySet());
+ processingQueues.removeAll(Arrays.asList(passiveQueues));
+
String[] queueNames = queuesMap.toArray(new String[0]);
- dispatchingStrategy = new MultipleQueueDispatchingStrategy(queueNames);
+ exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
} else {
- dispatchingStrategy = new SingleQueueDispatchingStrategy();
+ exportQueueStrategy = new SingleQueueDispatchingStrategy();
+ processingQueues.addAll(exportQueueStrategy.getQueueNames());
packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
}
DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.ADD, DistributionRequestType.DELETE };
- return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+ String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
+ int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);
+
+
+ if ("errorQueue".equals(retryStrategy)) {
+ importQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[0]));
+ }
+
+
+ return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
- queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots);
+ queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots, retryAttepts);
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -49,7 +49,10 @@ import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
/**
* An OSGi service factory for {@link org.apache.sling.distribution.agent.DistributionAgent}s which references already existing OSGi services.
@@ -155,13 +158,16 @@ public class QueueDistributionAgentFacto
allowedRoots = SettingsUtils.removeEmptyEntries(allowedRoots);
DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
- DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+ DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
+ DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);
DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.ADD, DistributionRequestType.DELETE };
+ Set<String> processingQueues = new HashSet<String>();
+ processingQueues.addAll(exportQueueStrategy.getQueueNames());
-
- return new SimpleDistributionAgent(agentName, false, null,
+ return new SimpleDistributionAgent(agentName, false, processingQueues,
serviceName, null, packageExporter, requestAuthorizationStrategy,
- queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots);
+ queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, allowedRoots, 0);
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -53,7 +53,11 @@ import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* An OSGi service factory for {@link org.apache.sling.distribution.agent.DistributionAgent}s which references already existing OSGi services.
@@ -100,10 +104,6 @@ public class ReverseDistributionAgentFac
@Property(boolValue = true, label = "Queue Processing Enabled", description = "Whether or not the distribution agent should process packages in the queues.")
public static final String QUEUE_PROCESSING_ENABLED = "queue.processing.enabled";
- @Property(cardinality = 100, label = "Passive queues", description = "List of queues that should be disabled." +
- "These queues will gather all the packages until they are removed explicitly.")
- public static final String PASSIVE_QUEUES = "passiveQueues";
-
/**
* endpoints property
*/
@@ -178,9 +178,6 @@ public class ReverseDistributionAgentFac
String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
boolean queueProcessingEnabled = PropertiesUtil.toBoolean(config.get(QUEUE_PROCESSING_ENABLED), true);
- String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
- passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
-
String[] exporterEndpoints = PropertiesUtil.toStringArray(config.get(EXPORTER_ENDPOINTS), new String[0]);
exporterEndpoints = SettingsUtils.removeEmptyEntries(exporterEndpoints);
@@ -194,13 +191,18 @@ public class ReverseDistributionAgentFac
DistributionPackageImporter packageImporter = new LocalDistributionPackageImporter(packageBuilder);
DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
- DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+ DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
+ DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.PULL };
+ Set<String> processingQueues = new HashSet<String>();
+ processingQueues.addAll(exportQueueStrategy.getQueueNames());
+
- return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+ return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
- queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null);
+ queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null, 0);
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Tue Aug 25 11:41:36 2015
@@ -22,11 +22,11 @@ import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
@@ -80,7 +80,9 @@ public class SimpleDistributionAgent imp
private final DistributionPackageImporter distributionPackageImporter;
private final DistributionPackageExporter distributionPackageExporter;
- private final DistributionQueueDispatchingStrategy queueDistributionStrategy;
+ private final DistributionQueueDispatchingStrategy exportQueueStrategy;
+ private final DistributionQueueDispatchingStrategy importQueueStrategy;
+
private final DistributionEventFactory distributionEventFactory;
@@ -91,29 +93,33 @@ public class SimpleDistributionAgent imp
private final String subServiceName;
private AgentBasedRequestHandler agentBasedRequestHandler;
private boolean active = false;
+ private final Set<String> processingQueues;
+ private final int retryAttempts;
private final DefaultDistributionLog log;
private final DistributionRequestType[] allowedRequests;
private final String[] allowedRoots;
- private final String[] passiveQueues;
public SimpleDistributionAgent(String name,
boolean queueProcessingEnabled,
- String[] passiveQueues,
+ Set<String> processingQueues,
String subServiceName,
DistributionPackageImporter distributionPackageImporter,
DistributionPackageExporter distributionPackageExporter,
DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy,
DistributionQueueProvider queueProvider,
- DistributionQueueDispatchingStrategy queueDistributionStrategy,
+ DistributionQueueDispatchingStrategy exportQueueStrategy,
+ DistributionQueueDispatchingStrategy importQueueStrategy,
DistributionEventFactory distributionEventFactory,
ResourceResolverFactory resourceResolverFactory,
DefaultDistributionLog log,
DistributionRequestType[] allowedRequests,
- String[] allowedRoots) {
+ String[] allowedRoots,
+ int retryAttempts) {
this.log = log;
this.allowedRequests = allowedRequests;
this.allowedRoots = allowedRoots;
- this.passiveQueues = passiveQueues;
+ this.processingQueues = processingQueues;
+ this.retryAttempts = retryAttempts;
// check configuration is valid
if (name == null
@@ -122,7 +128,7 @@ public class SimpleDistributionAgent imp
|| subServiceName == null
|| distributionRequestAuthorizationStrategy == null
|| queueProvider == null
- || queueDistributionStrategy == null
+ || exportQueueStrategy == null
|| distributionEventFactory == null
|| resourceResolverFactory == null) {
@@ -132,7 +138,7 @@ public class SimpleDistributionAgent imp
subServiceName,
distributionRequestAuthorizationStrategy,
queueProvider,
- queueDistributionStrategy,
+ exportQueueStrategy,
distributionEventFactory,
resourceResolverFactory});
throw new IllegalArgumentException("all arguments are required: " + errorMessage);
@@ -146,7 +152,8 @@ public class SimpleDistributionAgent imp
this.distributionPackageImporter = distributionPackageImporter;
this.distributionPackageExporter = distributionPackageExporter;
this.queueProvider = queueProvider;
- this.queueDistributionStrategy = queueDistributionStrategy;
+ this.exportQueueStrategy = exportQueueStrategy;
+ this.importQueueStrategy = importQueueStrategy;
this.distributionEventFactory = distributionEventFactory;
}
@@ -227,7 +234,7 @@ public class SimpleDistributionAgent imp
// dispatch the distribution package to the queue distribution handler
try {
- Iterable<DistributionQueueItemStatus> states = queueDistributionStrategy.add(distributionPackage, queueProvider);
+ Iterable<DistributionQueueItemStatus> states = exportQueueStrategy.add(distributionPackage, queueProvider);
for (DistributionQueueItemStatus state : states) {
DistributionRequestState requestState = getRequestStateFromQueueState(state.getItemState());
distributionResponses.add(new SimpleDistributionResponse(requestState, state.getItemState().toString()));
@@ -249,7 +256,12 @@ public class SimpleDistributionAgent imp
@Nonnull
public Iterable<String> getQueueNames() {
- return queueDistributionStrategy.getQueueNames();
+ Set<String> queueNames = new TreeSet<String>();
+ queueNames.addAll(exportQueueStrategy.getQueueNames());
+ if (importQueueStrategy != null) {
+ queueNames.addAll(importQueueStrategy.getQueueNames());
+ }
+ return queueNames;
}
public DistributionQueue getQueue(@Nonnull String queueName) throws DistributionAgentException {
@@ -313,15 +325,7 @@ public class SimpleDistributionAgent imp
if (!isPassive()) {
try {
-
- Set<String> allQueues = new HashSet<String>(queueDistributionStrategy.getQueueNames());
-
- if (passiveQueues != null) {
- Set<String> passiveQueues = new HashSet<String>(Arrays.asList(this.passiveQueues));
- allQueues.removeAll(passiveQueues);
- }
-
- queueProvider.enableQueueProcessing(new PackageQueueProcessor(), allQueues.toArray(new String[0]));
+ queueProvider.enableQueueProcessing(new PackageQueueProcessor(), processingQueues.toArray(new String[0]));
} catch (DistributionQueueException e) {
log.error("cannot enable queue processing", e);
}
@@ -373,10 +377,12 @@ public class SimpleDistributionAgent imp
}
- private boolean processQueueItem(String queueName, DistributionQueueItem queueItem) {
+ private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) {
boolean success = false;
ResourceResolver agentResourceResolver = null;
DistributionPackage distributionPackage = null;
+ DistributionQueueItem queueItem = queueEntry.getItem();
+ DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
try {
agentResourceResolver = getAgentResourceResolver();
@@ -387,21 +393,20 @@ public class SimpleDistributionAgent imp
distributionPackage.getInfo().putAll(queueItem);
distributionPackage.getInfo().put(DistributionPackageInfo.PROPERTY_ORIGIN_QUEUE, queueName);
-
- distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);
-
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
-
- generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage);
- success = true;
- log.info("distribution package {} was delivered", queueItem.getId());
+ if (processPackage(agentResourceResolver, distributionPackage)) {
+ success = true;
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+ generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage);
+ } else if (importQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
+ success = true;
+ reEnqueuePackage(agentResourceResolver, distributionPackage);
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+ }
} else {
success = true; // return success if package does not exist in order to clear the queue.
log.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getId());
}
- } catch (DistributionPackageImportException e) {
- log.error("could not deliver package {}", queueItem.getId(), e);
} catch (LoginException e) {
log.info("cannot obtain resource resolver", e);
} finally {
@@ -413,6 +418,35 @@ public class SimpleDistributionAgent imp
return success;
}
+ private boolean processPackage(ResourceResolver resourceResolver, DistributionPackage distributionPackage) {
+ try {
+ distributionPackageImporter.importPackage(resourceResolver, distributionPackage);
+ } catch (DistributionPackageImportException e) {
+ log.error("could not deliver package {}", distributionPackage.getId(), e);
+ return false;
+ }
+
+ log.info("distribution package {} was delivered", distributionPackage.getId());
+ return true;
+ }
+
+ private boolean reEnqueuePackage(ResourceResolver resourceResolver, DistributionPackage distributionPackage) {
+
+ if (importQueueStrategy == null) {
+ return false;
+ }
+
+ try {
+ importQueueStrategy.add(distributionPackage, queueProvider);
+ } catch (DistributionQueueException e) {
+ log.error("could not reenqueue package {}", distributionPackage.getId(), e);
+ return false;
+ }
+
+ log.info("distribution package {} was reenqueued", distributionPackage.getId());
+ return true;
+ }
+
private ResourceResolver getAgentResourceResolver() throws LoginException {
ResourceResolver resourceResolver;
@@ -495,7 +529,7 @@ public class SimpleDistributionAgent imp
try {
log.debug("queue {} processing item {}", queueName, queueItem);
- boolean success = processQueueItem(queueName, queueItem);
+ boolean success = processQueueItem(queueName, queueEntry);
log.debug("queue {} processing item {} ended with status {}", queueName, queueItem, success);
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -18,7 +18,11 @@
*/
package org.apache.sling.distribution.agent.impl;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -94,10 +98,6 @@ public class SimpleDistributionAgentFact
@Property(boolValue = true, label = "Queue Processing Enabled", description = "Whether or not the distribution agent should process packages in the queues.")
public static final String QUEUE_PROCESSING_ENABLED = "queue.processing.enabled";
- @Property(cardinality = 100, label = "Passive queues", description = "List of queues that should be disabled." +
- "These queues will gather all the packages until they are removed explicitly.")
- public static final String PASSIVE_QUEUES = "passiveQueues";
-
@Property(name = "packageExporter.target", label = "Exporter", description = "The target reference for the DistributionPackageExporter used to receive (export) the distribution packages," +
"e.g. use target=(name=...) to bind to services by name.")
@@ -160,14 +160,18 @@ public class SimpleDistributionAgentFact
String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
boolean queueProcessingEnabled = PropertiesUtil.toBoolean(config.get(QUEUE_PROCESSING_ENABLED), true);
- String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
- passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
- DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
- return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+ DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
+ DistributionQueueDispatchingStrategy importQueueStrategy = null;
+
+ Set<String> processingQueues = new HashSet<String>();
+ processingQueues.addAll(exportQueueStrategy.getQueueNames());
+
+
+ return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
- queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, null, null);
+ queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, null, null, 0);
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java Tue Aug 25 11:41:36 2015
@@ -41,6 +41,7 @@ import org.apache.sling.distribution.pac
import org.apache.sling.distribution.packaging.impl.importer.RemoteDistributionPackageImporter;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
@@ -54,8 +55,10 @@ import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -129,6 +132,17 @@ public class SyncDistributionAgentFactor
public static final String USE_MULTIPLE_QUEUES = "useMultipleQueues";
+ @Property(options = {
+ @PropertyOption(name = "none", value = "none"), @PropertyOption(name = "errorQueue", value = "errorQueue")},
+ value = "none",
+ label = "Retry Strategy", description = "The strategy to apply after a certain number of failed retries."
+ )
+ public static final String RETRY_STRATEGY = "retry.strategy";
+
+ @Property(intValue = 100, label = "Retry attempts", description = "The number of times to retry until the retry strategy is applied.")
+ public static final String RETRY_ATTEMPTS = "retry.attempts";
+
+
/**
* no. of items to poll property
*/
@@ -200,8 +214,6 @@ public class SyncDistributionAgentFactor
String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues);
-
-
Object exporterEndpointsValue = config.get(EXPORTER_ENDPOINTS);
Object importerEndpointsValue = config.get(IMPORTER_ENDPOINTS);
@@ -215,19 +227,24 @@ public class SyncDistributionAgentFactor
int pullItems = PropertiesUtil.toInteger(config.get(PULL_ITEMS), Integer.MAX_VALUE);
- DistributionQueueDispatchingStrategy dispatchingStrategy;
+ DistributionQueueDispatchingStrategy exportQueueStrategy;
+ DistributionQueueDispatchingStrategy importQueueStrategy = null;
DistributionPackageImporter packageImporter;
+ Set<String> processingQueues = new HashSet<String>();
if (useMultipleQueues) {
Set<String> queuesMap = new TreeSet<String>();
queuesMap.addAll(importerEndpointsMap.keySet());
queuesMap.addAll(Arrays.asList(passiveQueues));
+ processingQueues.addAll(importerEndpointsMap.keySet());
+ processingQueues.removeAll(Arrays.asList(passiveQueues));
String[] queueNames = queuesMap.toArray(new String[0]);
- dispatchingStrategy = new MultipleQueueDispatchingStrategy(queueNames);
+ exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
} else {
- dispatchingStrategy = new SingleQueueDispatchingStrategy();
+ exportQueueStrategy = new SingleQueueDispatchingStrategy();
+ processingQueues.addAll(exportQueueStrategy.getQueueNames());
packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
}
@@ -235,9 +252,18 @@ public class SyncDistributionAgentFactor
DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
DistributionRequestType[] allowedRequests = new DistributionRequestType[] { DistributionRequestType.PULL };
- return new SimpleDistributionAgent(agentName, queueProcessingEnabled, passiveQueues,
+ String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
+ int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);
+
+
+ if ("errorQueue".equals(retryStrategy)) {
+ importQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[0]));
+ }
+
+
+ return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
- queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null);
+ queueProvider, exportQueueStrategy, importQueueStrategy, distributionEventFactory, resourceResolverFactory, distributionLog, allowedRequests, null, retryAttepts);
}
}
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java?rev=1697652&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/ErrorQueueDispatchingStrategy.java Tue Aug 25 11:41:36 2015
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl;
+
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueException;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * The error strategy for delivering packages to queues. The strategy delivers the packages in a queue named error-queueName
+ * where the queueName is the name of the original queue the package was in.
+ */
+public class ErrorQueueDispatchingStrategy implements DistributionQueueDispatchingStrategy {
+
+ private final static String ERROR_PREFIX = "error-";
+ private final Set<String> queueNames = new TreeSet<String>();
+
+ public ErrorQueueDispatchingStrategy(String[] queueNames) {
+
+ this.queueNames.addAll(Arrays.asList(queueNames));
+ }
+
+ @Override
+ public Iterable<DistributionQueueItemStatus> add(@Nonnull DistributionPackage distributionPackage, @Nonnull DistributionQueueProvider queueProvider) throws DistributionQueueException {
+
+ List<DistributionQueueItemStatus> result = new ArrayList<DistributionQueueItemStatus>();
+ String originQueue = distributionPackage.getInfo().getQueue();
+
+ if (!queueNames.contains(originQueue)) {
+ return result;
+ }
+
+ String errorQueueName = ERROR_PREFIX + originQueue;
+
+ DistributionQueue errorQueue = queueProvider.getQueue(errorQueueName);
+
+ DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, errorQueueName);
+
+ DistributionQueueItem queueItem = DistributionPackageUtils.toQueueItem(distributionPackage);
+
+ if (errorQueue.add(queueItem)) {
+ DistributionPackageUtils.acquire(distributionPackage, errorQueueName);
+
+ DistributionQueueEntry entry = errorQueue.getItem(queueItem.getId());
+ status = entry.getStatus();
+ }
+
+ result.add(status);
+
+ return result;
+ }
+
+ @Nonnull
+ @Override
+ public List<String> getQueueNames() {
+ List<String> errorQueueNames = new ArrayList<String>();
+ for (String queueName : queueNames) {
+ errorQueueNames.add(ERROR_PREFIX + queueName);
+ }
+ return errorQueueNames;
+ }
+}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java Tue Aug 25 11:41:36 2015
@@ -195,6 +195,10 @@ public class JobHandlingDistributionQueu
DistributionQueueItemStatus firstItemStatus = firstJob != null ? JobHandlingUtils.getStatus(firstJob) : null;
DistributionQueueState state = DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
+ if (!isActive) {
+ state = DistributionQueueState.PAUSED;
+ }
+
int itemsCount = jobs.size();
return new DistributionQueueStatus(itemsCount, state);
Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java?rev=1697652&r1=1697651&r2=1697652&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java Tue Aug 25 11:41:36 2015
@@ -71,8 +71,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, distributionHandler,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+ queueProvider, distributionHandler, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
DistributionPackage distributionPackage = mock(DistributionPackage.class);
ResourceResolver resourceResolver = mock(ResourceResolver.class);
@@ -102,7 +102,7 @@ public class SimpleDistributionAgentTest
false, null, "subServiceName", packageImporter,
packageExporter, packageExporterStrategy,
queueProvider,
- distributionHandler, distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+ distributionHandler, null, distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
DistributionPackage distributionPackage = mock(DistributionPackage.class);
ResourceResolver resourceResolver = mock(ResourceResolver.class);
@@ -135,8 +135,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, distributionHandler,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+ queueProvider, distributionHandler, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
DistributionPackage distributionPackage = mock(DistributionPackage.class);
DistributionPackageInfo packageInfo = new DistributionPackageInfo("type");
@@ -165,8 +165,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, distributionHandler,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+ queueProvider, distributionHandler, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
DistributionQueue queue = mock(DistributionQueue.class);
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME))
.thenReturn(queue);
@@ -188,8 +188,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, distributionHandler,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+ queueProvider, distributionHandler, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
DistributionQueue queue = mock(DistributionQueue.class);
when(queueProvider.getQueue("priority")).thenReturn(queue);
assertNotNull(agent.getQueue("priority"));
@@ -210,8 +210,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, distributionHandler,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null);
+ queueProvider, distributionHandler, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, null, 0);
DistributionQueue queue = mock(DistributionQueue.class);
when(queueProvider.getQueue("priority")).thenReturn(queue);
assertNull(agent.getQueue("weird"));
@@ -238,8 +238,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, queueDistributionStrategy,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" });
+ queueProvider, queueDistributionStrategy, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" }, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/content");
DistributionPackage distributionPackage = mock(DistributionPackage.class);
@@ -277,8 +277,8 @@ public class SimpleDistributionAgentTest
SimpleDistributionAgent agent = new SimpleDistributionAgent(name,
false, null, "serviceName", packageImporter,
packageExporter, packageExporterStrategy,
- queueProvider, queueDistributionStrategy,
- distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" });
+ queueProvider, queueDistributionStrategy, null,
+ distributionEventFactory, resolverFactory, mock(DefaultDistributionLog.class), null, new String[] { "/content" }, 0);
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/home");
DistributionPackage distributionPackage = mock(DistributionPackage.class);