You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2014/11/04 16:44:13 UTC
svn commit: r1636613 [1/2] - in
/sling/trunk/contrib/extensions/replication/core/src:
main/java/org/apache/sling/replication/agent/impl/
main/java/org/apache/sling/replication/component/
main/java/org/apache/sling/replication/component/impl/ main/java/...
Author: mpetria
Date: Tue Nov 4 15:44:12 2014
New Revision: 1636613
URL: http://svn.apache.org/r1636613
Log:
SLING-3965: refactor queue provider api to have single agent providers
Removed:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
Modified:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java Tue Nov 4 15:44:12 2014
@@ -36,6 +36,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.component.ManagedReplicationComponent;
import org.apache.sling.replication.component.ReplicationComponent;
@@ -66,13 +67,8 @@ import org.slf4j.LoggerFactory;
)
public class CoordinatingReplicationAgentFactory implements ReplicationComponentProvider {
- private static final String QUEUE_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER + ".target";
- private static final String QUEUE_DISTRIBUTION_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY + ".target";
private static final String TRANSPORT_AUTHENTICATION_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_TRANSPORT_AUTHENTICATION_PROVIDER + ".target";
- private static final String DEFAULT_QUEUEPROVIDER = "(name=" + JobHandlingReplicationQueueProvider.NAME + ")";
- private static final String DEFAULT_DISTRIBUTION = "(name=" + SingleQueueDistributionStrategy.NAME + ")";
-
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -97,14 +93,6 @@ public class CoordinatingReplicationAgen
@Property(label = "Package Importer", cardinality = 100)
public static final String PACKAGE_IMPORTER = ReplicationComponentFactory.COMPONENT_PACKAGE_IMPORTER;
- @Property(label = "Target ReplicationQueueProvider", name = QUEUE_PROVIDER_TARGET, value = DEFAULT_QUEUEPROVIDER)
- @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER, target = DEFAULT_QUEUEPROVIDER)
- private volatile ReplicationQueueProvider queueProvider;
-
- @Property(label = "Target QueueDistributionStrategy", name = QUEUE_DISTRIBUTION_TARGET, value = DEFAULT_DISTRIBUTION)
- @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY, target = DEFAULT_DISTRIBUTION)
- private volatile ReplicationQueueDistributionStrategy queueDistributionStrategy;
-
@Property(label = "Target TransportAuthenticationProvider", name = TRANSPORT_AUTHENTICATION_PROVIDER_TARGET)
@Reference(name = "transportAuthenticationProvider")
private volatile TransportAuthenticationProvider transportAuthenticationProvider;
@@ -116,13 +104,21 @@ public class CoordinatingReplicationAgen
private SlingSettingsService settingsService;
@Reference
+ private JobManager jobManager;
+
+ @Reference
private ReplicationComponentFactory componentFactory;
+ private BundleContext savedContext;
+
private ServiceRegistration componentReg;
+ private String agentName;
+
@Activate
protected void activate(BundleContext context, Map<String, Object> config) {
+ savedContext = context;
// inject configuration
Dictionary<String, Object> props = new Hashtable<String, Object>();
@@ -131,9 +127,8 @@ public class CoordinatingReplicationAgen
if (enabled) {
props.put(ENABLED, true);
- String name = PropertiesUtil
- .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
- props.put(NAME, name);
+ agentName = PropertiesUtil.toString(config.get(NAME), null);
+ props.put(NAME, agentName);
if (componentReg == null) {
Map<String, Object> properties = new HashMap<String, Object>();
@@ -159,14 +154,11 @@ public class CoordinatingReplicationAgen
packageExporterProperties = packageExporterPropertiesList.toArray(new String[packageExporterPropertiesList.size()]);
properties.put(PACKAGE_EXPORTER, packageExporterProperties);
- properties.put("trigger0", new String[]{"type=scheduledEvent"});
-
ReplicationAgent agent = componentFactory.createComponent(ReplicationAgent.class, properties, this);
- log.debug("activated agent {}", name);
+ log.debug("activated agent {}", agentName);
if (agent != null) {
-
// register agent service
componentReg = context.registerService(ReplicationAgent.class.getName(), agent, props);
if (agent instanceof ManagedReplicationComponent) {
@@ -196,9 +188,9 @@ public class CoordinatingReplicationAgen
public <ComponentType extends ReplicationComponent> ComponentType getComponent(@Nonnull Class<ComponentType> type,
@Nullable String componentName) {
if (type.isAssignableFrom(ReplicationQueueProvider.class)) {
- return (ComponentType) queueProvider;
+ return (ComponentType) new JobHandlingReplicationQueueProvider(agentName, jobManager, savedContext);
} else if (type.isAssignableFrom(ReplicationQueueDistributionStrategy.class)) {
- return (ComponentType) queueDistributionStrategy;
+ return (ComponentType) new SingleQueueDistributionStrategy();
} else if (type.isAssignableFrom(TransportAuthenticationProvider.class)) {
return (ComponentType) transportAuthenticationProvider;
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Tue Nov 4 15:44:12 2014
@@ -172,7 +172,7 @@ public class SimpleReplicationAgent impl
// dispatch the replication package to the queue distribution handler
try {
- boolean success = queueDistributionStrategy.add(name, replicationPackage, queueProvider);
+ boolean success = queueDistributionStrategy.add(replicationPackage, queueProvider);
Dictionary<Object, Object> properties = new Properties();
properties.put("replication.package.paths", replicationPackage.getPaths());
@@ -197,9 +197,9 @@ public class SimpleReplicationAgent impl
ReplicationQueue queue;
try {
if (queueName != null && queueName.length() > 0) {
- queue = queueProvider.getQueue(this.name, queueName);
+ queue = queueProvider.getQueue(queueName);
} else {
- queue = queueProvider.getQueue(this.name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME);
+ queue = queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME);
}
} catch (ReplicationQueueException e) {
throw new ReplicationAgentException(e);
@@ -223,7 +223,11 @@ public class SimpleReplicationAgent impl
}
if (!isPassive()) {
- queueProvider.enableQueueProcessing(name, new PackageQueueProcessor());
+ try {
+ queueProvider.enableQueueProcessing(new PackageQueueProcessor());
+ } catch (ReplicationQueueException e) {
+ log.error("cannot enable queue processing", e);
+ }
}
}
@@ -241,7 +245,12 @@ public class SimpleReplicationAgent impl
agentBasedRequestHandler = null;
if (!isPassive()) {
- queueProvider.disableQueueProcessing(name);
+
+ try {
+ queueProvider.disableQueueProcessing();
+ } catch (ReplicationQueueException e) {
+ log.error("cannot disable queue processing", e);
+ }
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java Tue Nov 4 15:44:12 2014
@@ -34,6 +34,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.component.ManagedReplicationComponent;
import org.apache.sling.replication.component.ReplicationComponent;
@@ -63,12 +64,8 @@ import org.slf4j.LoggerFactory;
policy = ConfigurationPolicy.REQUIRE
)
public class SimpleReplicationAgentFactory implements ReplicationComponentProvider {
- private static final String QUEUE_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER + ".target";
- private static final String QUEUE_DISTRIBUTION_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY + ".target";
- private static final String TRANSPORT_AUTHENTICATION_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_TRANSPORT_AUTHENTICATION_PROVIDER + ".target";
- private static final String DEFAULT_QUEUEPROVIDER = "(name=" + JobHandlingReplicationQueueProvider.NAME + ")";
- private static final String DEFAULT_DISTRIBUTION = "(name=" + SingleQueueDistributionStrategy.NAME + ")";
+ private static final String TRANSPORT_AUTHENTICATION_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_TRANSPORT_AUTHENTICATION_PROVIDER + ".target";
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -96,14 +93,6 @@ public class SimpleReplicationAgentFacto
@Property(label = "Service Name")
public static final String SERVICE_NAME = ReplicationComponentFactory.AGENT_SIMPLE_PROPERTY_SERVICE_NAME;
- @Property(label = "Target ReplicationQueueProvider", name = QUEUE_PROVIDER_TARGET, value = DEFAULT_QUEUEPROVIDER)
- @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER, target = DEFAULT_QUEUEPROVIDER)
- private volatile ReplicationQueueProvider queueProvider;
-
- @Property(label = "Target QueueDistributionStrategy", name = QUEUE_DISTRIBUTION_TARGET, value = DEFAULT_DISTRIBUTION)
- @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY, target = DEFAULT_DISTRIBUTION)
- private volatile ReplicationQueueDistributionStrategy queueDistributionStrategy;
-
@Property(label = "Target TransportAuthenticationProvider", name = TRANSPORT_AUTHENTICATION_PROVIDER_TARGET)
@Reference(name = "transportAuthenticationProvider", policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_UNARY)
@@ -116,11 +105,15 @@ public class SimpleReplicationAgentFacto
private SlingSettingsService settingsService;
@Reference
+ private JobManager jobManager;
+
+ @Reference
private ReplicationComponentFactory componentFactory;
private ServiceRegistration componentReg;
private BundleContext savedContext;
private Map<String, Object> savedConfig;
+ private String agentName;
@Activate
protected void activate(BundleContext context, Map<String, Object> config) {
@@ -136,14 +129,13 @@ public class SimpleReplicationAgentFacto
if (enabled) {
props.put(ENABLED, true);
- String name = PropertiesUtil
- .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
- props.put(NAME, name);
+ agentName = PropertiesUtil.toString(config.get(NAME), null);
+ props.put(NAME, agentName);
if (componentReg == null && componentFactory != null) {
ReplicationAgent agent = componentFactory.createComponent(ReplicationAgent.class, config, this);
- log.debug("activated agent {}", name);
+ log.debug("activated agent {}", agentName);
if (agent != null) {
@@ -176,10 +168,10 @@ public class SimpleReplicationAgentFacto
public <ComponentType extends ReplicationComponent> ComponentType getComponent(@Nonnull Class<ComponentType> type,
@Nullable String componentName) {
if (type.isAssignableFrom(ReplicationQueueProvider.class)) {
- return (ComponentType) queueProvider;
+ return (ComponentType) new JobHandlingReplicationQueueProvider(agentName, jobManager, savedContext);
}
else if (type.isAssignableFrom(ReplicationQueueDistributionStrategy.class)) {
- return (ComponentType) queueDistributionStrategy;
+ return (ComponentType) new SingleQueueDistributionStrategy();
}
else if (type.isAssignableFrom(TransportAuthenticationProvider.class)) {
return (ComponentType) transportAuthenticationProvider;
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java Tue Nov 4 15:44:12 2014
@@ -289,6 +289,35 @@ public interface ReplicationComponentFac
/**
+ * queue provider job type
+ */
+ String QUEUE_PROVIDER_JOB = "job";
+
+ /**
+ * queue provider simple type
+ */
+ String QUEUE_PROVIDER_SIMPLE = "simple";
+
+ /**
+ * queue distribution strategy single type
+ */
+ String QUEUE_DISTRIBUTION_STRATEGY_SINGLE = "single";
+
+ /**
+ * queue distribution strategy priority type
+ */
+ String QUEUE_DISTRIBUTION_STRATEGY_PRIORITY = "priority";
+
+ /**
+ * queue distribution strategy priority paths property
+ */
+ String QUEUE_DISTRIBUTION_STRATEGY_PRIORITY_PROPERTY_PATHS = "priority.paths";
+
+
+
+
+
+ /**
* create a {@link ReplicationComponent}
*
* @param type the {@link java.lang.Class} of the component to be created
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java Tue Nov 4 15:44:12 2014
@@ -34,6 +34,7 @@ import org.apache.jackrabbit.vault.packa
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.jcr.api.SlingRepository;
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.agent.ReplicationRequestAuthorizationStrategy;
@@ -53,6 +54,11 @@ import org.apache.sling.replication.pack
import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporter;
import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.impl.ErrorAwareQueueDistributionStrategy;
+import org.apache.sling.replication.queue.impl.PriorityPathDistributionStrategy;
+import org.apache.sling.replication.queue.impl.SingleQueueDistributionStrategy;
+import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider;
+import org.apache.sling.replication.queue.impl.simple.SimpleReplicationQueueProvider;
import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
import org.apache.sling.replication.serialization.impl.ResourceSharedReplicationPackageBuilder;
import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
@@ -103,6 +109,9 @@ public class DefaultReplicationComponent
@Reference
private Scheduler scheduler;
+ @Reference
+ private JobManager jobManager;
+
private BundleContext bundleContext;
@Activate
@@ -262,6 +271,15 @@ public class DefaultReplicationComponent
String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
return componentProvider.getComponent(ReplicationQueueProvider.class, name);
}
+ else if (QUEUE_PROVIDER_JOB.equals(factory)) {
+ String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
+ return new JobHandlingReplicationQueueProvider(name, jobManager, bundleContext);
+ }
+ else if (QUEUE_PROVIDER_SIMPLE.equals(factory)) {
+ String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
+
+ return new SimpleReplicationQueueProvider(scheduler, name);
+ }
return null;
}
@@ -272,7 +290,14 @@ public class DefaultReplicationComponent
if (COMPONENT_TYPE_SERVICE.equals(factory)) {
String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
return componentProvider.getComponent(ReplicationQueueDistributionStrategy.class, name);
+ }
+ else if (QUEUE_DISTRIBUTION_STRATEGY_SINGLE.equals(factory)) {
+ return new SingleQueueDistributionStrategy();
+ }
+ else if (QUEUE_DISTRIBUTION_STRATEGY_PRIORITY.equals(factory)) {
+ String[] priorityPaths = PropertiesUtil.toStringArray(properties.get(QUEUE_DISTRIBUTION_STRATEGY_PRIORITY_PROPERTY_PATHS), null);
+ return new PriorityPathDistributionStrategy(priorityPaths);
}
return null;
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Tue Nov 4 15:44:12 2014
@@ -39,15 +39,13 @@ public interface ReplicationQueueDistrib
* synchronously distribute a {@link org.apache.sling.replication.packaging.ReplicationPackage}
* to one or more {@link ReplicationQueue}s provided by the given {@link ReplicationQueueProvider}
*
- * @param agentName the name of a {@link ReplicationAgent}
* @param replicationPackage a {@link org.apache.sling.replication.packaging.ReplicationPackage} to distribute
* @param queueProvider the {@link ReplicationQueueProvider} used to provide the queues to be used for the given package
* @return a {@link ReplicationQueueItemState} representing the state of the package in the queue after its distribution
* @throws ReplicationQueueException if distribution fails
*/
@Nonnull
- boolean add(@Nonnull String agentName, @Nonnull ReplicationPackage replicationPackage,
- @Nonnull ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+ boolean add(@Nonnull ReplicationPackage replicationPackage, @Nonnull ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
/**
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Tue Nov 4 15:44:12 2014
@@ -33,30 +33,26 @@ public interface ReplicationQueueProvide
/**
* provide a named queue for the given agent
*
- * @param agentName the replication agent needing the queue
* @param queueName the name of the queue to retrieve
* @return a replication queue to be used for the given parameters
* @throws ReplicationQueueException
*/
@Nonnull
- ReplicationQueue getQueue(@Nonnull String agentName, @Nonnull String queueName)
- throws ReplicationQueueException;
+ ReplicationQueue getQueue(@Nonnull String queueName) throws ReplicationQueueException;
/**
* enables queue driven processing for an agent
*
- * @param agentName a replication agent
* @param queueProcessor the queue processor to be used
*/
- void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor);
+ void enableQueueProcessing(@Nonnull ReplicationQueueProcessor queueProcessor) throws ReplicationQueueException;
/**
* disables queue driven processing for an agent
*
- * @param agentName a replication agent
*/
- void disableQueueProcessing(@Nonnull String agentName);
+ void disableQueueProcessing() throws ReplicationQueueException;
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Tue Nov 4 15:44:12 2014
@@ -46,9 +46,6 @@ import org.slf4j.LoggerFactory;
* delivering packages with an error queue which can be used when an item is stuck in the default
* queue for too much time, then the stuck item is moved to the error queue or dropped.
*/
-@Component(immediate = true, metatype = true, label = "Error Aware Queue Distribution Strategy")
-@Service(value = ReplicationQueueDistributionStrategy.class)
-@Property(name = "name", value = ErrorAwareQueueDistributionStrategy.NAME, propertyPrivate = true)
public class ErrorAwareQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
protected static final String ERROR_QUEUE_NAME = "error";
@@ -76,6 +73,7 @@ public class ErrorAwareQueueDistribution
private Integer timeThreshold;
+
@Activate
protected void activate(final ComponentContext ctx) {
stuckQueueHandling = PropertiesUtil
@@ -84,13 +82,12 @@ public class ErrorAwareQueueDistribution
timeThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(TIME_THRESHOLD), 600000);
}
- public boolean add(String agentName, ReplicationPackage replicationPackage,
- ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+ public boolean add(ReplicationPackage replicationPackage, ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
boolean added;
ReplicationQueueItem queueItem = getItem(replicationPackage);
- ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
+ ReplicationQueue queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
added = queue.add(queueItem);
- checkAndRemoveStuckItems(agentName, queueProvider);
+ checkAndRemoveStuckItems(queueProvider);
return added;
}
@@ -98,9 +95,8 @@ public class ErrorAwareQueueDistribution
return Arrays.asList(new String[] { ERROR_QUEUE_NAME, DEFAULT_QUEUE_NAME });
}
- private void checkAndRemoveStuckItems(String agent,
- ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
- ReplicationQueue defaultQueue = queueProvider.getQueue(agent, DEFAULT_QUEUE_NAME);
+ private void checkAndRemoveStuckItems(ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+ ReplicationQueue defaultQueue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
// get first item in the queue with its status
ReplicationQueueItem firstItem = defaultQueue.getHead();
if (firstItem != null) {
@@ -113,7 +109,7 @@ public class ErrorAwareQueueDistribution
if (ERROR.equals(stuckQueueHandling)) {
log.warn("item {} moved to the error queue", firstItem);
- ReplicationQueue errorQueue = queueProvider.getQueue(agent, ERROR_QUEUE_NAME);
+ ReplicationQueue errorQueue = queueProvider.getQueue(ERROR_QUEUE_NAME);
if (!errorQueue.add(firstItem)) {
log.error("failed to move item {} the queue {}", firstItem, errorQueue);
throw new ReplicationQueueException("could not move an item to the error queue");
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Tue Nov 4 15:44:12 2014
@@ -43,29 +43,21 @@ import org.slf4j.LoggerFactory;
* Distribution algorithm which keeps one specific queue to handle specific paths and another queue
* for handling all the other paths
*/
-@Component(immediate = true, metatype = true, label = "Priority Path Queue Distribution Strategy")
-@Service(value = ReplicationQueueDistributionStrategy.class)
-@Property(name = "name", value = PriorityPathDistributionStrategy.NAME, propertyPrivate = true)
public class PriorityPathDistributionStrategy implements ReplicationQueueDistributionStrategy {
- public static final String NAME = "priority";
-
private final Logger log = LoggerFactory.getLogger(getClass());
- @Property(value = {"/content"})
- private static final String PRIORITYPATHS = "priority.paths";
+ private final String[] priorityPaths;
- private String[] priorityPaths;
+ public PriorityPathDistributionStrategy(String[] priorityPaths) {
+ this.priorityPaths = priorityPaths;
- @Activate
- protected void activate(ComponentContext context) {
- priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
}
- private ReplicationQueue getQueue(String agentName, ReplicationQueueItem replicationPackage,
- ReplicationQueueProvider queueProvider)
+
+ private ReplicationQueue getQueue(ReplicationQueueItem replicationPackage, ReplicationQueueProvider queueProvider)
throws ReplicationQueueException {
String[] paths = replicationPackage.getPaths();
@@ -86,23 +78,22 @@ public class PriorityPathDistributionStr
ReplicationQueue queue;
if (usePriorityQueue) {
log.info("using priority queue for path {}", pp);
- queue = queueProvider.getQueue(agentName, pp);
+ queue = queueProvider.getQueue(pp);
} else {
log.info("using default queue");
- queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
+ queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
}
return queue;
}
- public boolean add(String agentName, ReplicationPackage replicationPackage,
- ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+ public boolean add(ReplicationPackage replicationPackage, ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueueItem queueItem = getItem(replicationPackage);
- ReplicationQueue queue = getQueue(agentName, queueItem, queueProvider);
+ ReplicationQueue queue = getQueue(queueItem, queueProvider);
if (queue != null) {
return queue.add(queueItem);
} else {
- throw new ReplicationQueueException("could not get a queue for agent " + agentName);
+ throw new ReplicationQueueException("could not get a queue");
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Tue Nov 4 15:44:12 2014
@@ -42,19 +42,13 @@ import java.util.List;
* The default strategy for delivering packages to queues. Each agent just manages a single queue,
* no failure / stuck handling where each package is put regardless of anything.
*/
-@Component(immediate = true, label = "Single Queue Distribution Strategy")
-@Service(value = ReplicationQueueDistributionStrategy.class)
-@Property(name = "name", value = SingleQueueDistributionStrategy.NAME)
public class SingleQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
- public static final String NAME = "single";
-
private final Logger log = LoggerFactory.getLogger(getClass());
- public boolean add(String agentName, ReplicationPackage replicationPackage,
- ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+ public boolean add(ReplicationPackage replicationPackage, ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueueItem queueItem = getItem(replicationPackage);
- ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
+ ReplicationQueue queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
return queue.add(queueItem);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Tue Nov 4 15:44:12 2014
@@ -36,89 +36,66 @@ import org.apache.sling.replication.queu
import org.apache.sling.replication.queue.ReplicationQueueException;
import org.apache.sling.replication.queue.ReplicationQueueProcessor;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Component(metatype = false, label = "Sling Job handling based Replication Queue Provider")
-@Service(value = ReplicationQueueProvider.class)
-@Property(name = "name", value = JobHandlingReplicationQueueProvider.NAME)
-public class JobHandlingReplicationQueueProvider extends AbstractReplicationQueueProvider
- implements ReplicationQueueProvider {
-
- public static final String NAME = "sjh";
-
+/**
+ * a queue provider {@link ReplicationQueueProvider} for sling jobs based
+ * {@link ReplicationQueue}s
+ */
+public class JobHandlingReplicationQueueProvider implements ReplicationQueueProvider {
private final Logger log = LoggerFactory.getLogger(getClass());
- @Reference
- private JobManager jobManager;
+ private final String name;
+ private final JobManager jobManager;
- private final Map<String, ServiceRegistration> jobConsumers = new ConcurrentHashMap<String, ServiceRegistration>();
+
+ private ServiceRegistration jobConsumer = null;
private BundleContext context;
- protected JobHandlingReplicationQueueProvider(JobManager jobManager, BundleContext context) {
+ public JobHandlingReplicationQueueProvider(String name, JobManager jobManager, BundleContext context) {
+ if (name == null || jobManager == null || context == null) {
+ throw new IllegalArgumentException("all arguments are required");
+ }
+ this.name = name;
this.jobManager = jobManager;
this.context = context;
}
- public JobHandlingReplicationQueueProvider() {
- }
-
- @Override
- protected ReplicationQueue getInternalQueue(String agentName, String queueName)
- throws ReplicationQueueException {
- String name = agentName;
- if (queueName.length() > 0) {
- name += "/" + queueName;
- }
- String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
+ public ReplicationQueue getQueue(String queueName) throws ReplicationQueueException {
+ String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name + "/" + queueName;
return new JobHandlingReplicationQueue(name, topic, jobManager);
}
- public void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor) {
+ public void enableQueueProcessing(@Nonnull ReplicationQueueProcessor queueProcessor) throws ReplicationQueueException {
+ if (jobConsumer != null) {
+ throw new ReplicationQueueException("job already registered");
+ }
// eventually register job consumer for sling job handling based queues
Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
- String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agentName;
+ String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
String childTopic = topic + "/*";
jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
- synchronized (jobConsumers) {
- log.info("registering job consumer for agent {}", agentName);
- ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
- new ReplicationAgentJobConsumer(queueProcessor), jobProps);
- if (jobReg != null) {
- jobConsumers.put(agentName, jobReg);
- }
- log.info("job consumer for agent {} registered", agentName);
- }
+ log.info("registering job consumer for agent {}", name);
+ jobConsumer = context.registerService(JobConsumer.class.getName(), new ReplicationAgentJobConsumer(queueProcessor), jobProps);
+ log.info("job consumer for agent {} registered", name);
}
- public void disableQueueProcessing(@Nonnull String agentName) {
- synchronized (jobConsumers) {
- log.info("unregistering job consumer for agent {}", agentName);
- ServiceRegistration jobReg = jobConsumers.remove(agentName);
- if (jobReg != null) {
- jobReg.unregister();
- log.info("job consumer for agent {} unregistered", agentName);
- }
+ public void disableQueueProcessing() {
+ if (jobConsumer != null) {
+ jobConsumer.unregister();
+ log.info("job consumer for agent {} unregistered", name);
+ jobConsumer = null;
}
- }
+ log.info("unregistering job consumer for agent {}", name);
- @Activate
- private void activate(BundleContext context) {
- this.context = context;
}
- @Deactivate
- private void deactivate() {
- for (ServiceRegistration jobReg : jobConsumers.values()) {
- jobReg.unregister();
- }
- this.context = null;
- }
+
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java Tue Nov 4 15:44:12 2014
@@ -44,7 +44,7 @@ public class ScheduledReplicationQueuePr
public void run() {
try {
- for (ReplicationQueue queue : queueProvider.getAllQueues()) {
+ for (ReplicationQueue queue : queueProvider.getQueues()) {
while (!queue.isEmpty()) {
// synchronized (queue) {
ReplicationQueueItem item = queue.getHead();
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Tue Nov 4 15:44:12 2014
@@ -20,61 +20,77 @@ package org.apache.sling.replication.que
import javax.annotation.Nonnull;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueException;
import org.apache.sling.replication.queue.ReplicationQueueProcessor;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
- * an OSGi service implementing {@link ReplicationQueueProvider} for simple in memory
+ * a queue provider {@link ReplicationQueueProvider} for simple in memory
* {@link ReplicationQueue}s
*/
-@Component(metatype = false, label = "In memory Replication Queue Provider")
-@Service(value = ReplicationQueueProvider.class)
-@Property(name = "name", value = SimpleReplicationQueueProvider.NAME)
-public class SimpleReplicationQueueProvider extends AbstractReplicationQueueProvider implements
- ReplicationQueueProvider {
+public class SimpleReplicationQueueProvider implements ReplicationQueueProvider {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+ private final String name;
+ private final Scheduler scheduler;
- @Reference
- Scheduler scheduler;
+ private final Map<String, ReplicationQueue> queueMap = new ConcurrentHashMap<String, ReplicationQueue>();
- public static final String NAME = "simple";
- protected SimpleReplicationQueueProvider(Scheduler scheduler) {
+ public SimpleReplicationQueueProvider(Scheduler scheduler, String name) {
+ if (name == null || scheduler == null) {
+ throw new IllegalArgumentException("all arguments are required");
+ }
+
this.scheduler = scheduler;
- }
+ this.name = name;
- public SimpleReplicationQueueProvider() {
}
- protected ReplicationQueue getInternalQueue(String agentName, String selector)
+ @Nonnull
+ public ReplicationQueue getQueue(@Nonnull String queueName)
throws ReplicationQueueException {
- return new SimpleReplicationQueue(agentName, selector);
+ String key = name + queueName;
+
+ ReplicationQueue queue = queueMap.get(key);
+ if (queue == null) {
+ log.info("creating a queue with key {}", key);
+ queue = new SimpleReplicationQueue(name, queueName);
+ queueMap.put(key, queue);
+ log.info("queue created {}", queue);
+ }
+ return queue;
}
- protected void deleteQueue(ReplicationQueue queue) {
- // do nothing as queues just exist in the cache
+
+ protected Collection<ReplicationQueue> getQueues() {
+ return queueMap.values();
}
- public void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor) {
+ public void enableQueueProcessing(@Nonnull ReplicationQueueProcessor queueProcessor) {
ScheduleOptions options = scheduler.NOW(-1, 10)
.canRunConcurrently(false)
- .name(getJobName(agentName));
+ .name(getJobName());
scheduler.schedule(new ScheduledReplicationQueueProcessorTask(this, queueProcessor), options);
}
- public void disableQueueProcessing(@Nonnull String agentName) {
- scheduler.unschedule(getJobName(agentName));
+ public void disableQueueProcessing() {
+ scheduler.unschedule(getJobName());
}
- private String getJobName(String agentName) {
- return SimpleReplicationQueueProvider.NAME + "-queueProcessor-" + agentName;
+ private String getJobName() {
+ return "simple-queueProcessor-" + name;
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java Tue Nov 4 15:44:12 2014
@@ -42,6 +42,7 @@ public class CoordinatingReplicationAgen
BundleContext context = mock(BundleContext.class);
Map<String, Object> config = new HashMap<String, Object>();
try {
+ config.put(CoordinatingReplicationAgentFactory.NAME, "agentName");
coordinatingReplicationAgentFactory.activate(context, config);
fail("cannot activate a coordinate agents without exporters/importers");
} catch (IllegalArgumentException e) {
@@ -54,6 +55,7 @@ public class CoordinatingReplicationAgen
CoordinatingReplicationAgentFactory coordinatingReplicationAgentFactory = new CoordinatingReplicationAgentFactory();
BundleContext context = mock(BundleContext.class);
Map<String, Object> config = new HashMap<String, Object>();
+ config.put(CoordinatingReplicationAgentFactory.NAME, "agentName");
config.put(CoordinatingReplicationAgentFactory.PACKAGE_IMPORTER, new String[]{});
config.put(CoordinatingReplicationAgentFactory.PACKAGE_EXPORTER, new String[]{});
try {
@@ -70,6 +72,8 @@ public class CoordinatingReplicationAgen
BundleContext context = mock(BundleContext.class);
Map<String, Object> config = new HashMap<String, Object>();
+
+ config.put(CoordinatingReplicationAgentFactory.NAME, "agentName");
config.put(CoordinatingReplicationAgentFactory.PACKAGE_IMPORTER, new String[]{"packageBuilder/type=vlt",
"endpoints[0]=http://host:101/libs/sling/replication/services/exporters/reverse-101",
"endpoints[1]=http://host:102/libs/sling/replication/services/exporters/reverse-102",
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Tue Nov 4 15:44:12 2014
@@ -59,7 +59,7 @@ public class SimpleReplicationAgentTest
ReplicationRequestAuthorizationStrategy packageExporterStrategy = mock(ReplicationRequestAuthorizationStrategy.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
- when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), any(ReplicationQueueProvider.class))).thenReturn(false);
+ when(distributionHandler.add(any(ReplicationPackage.class), any(ReplicationQueueProvider.class))).thenReturn(false);
ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
ResourceResolverFactory resolverFactory = mock(ResourceResolverFactory.class);
@@ -76,7 +76,7 @@ public class SimpleReplicationAgentTest
when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
.thenReturn(Arrays.asList(replicationPackage));
- when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
new SimpleReplicationQueue(name, "name"));
ReplicationResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
@@ -104,10 +104,10 @@ public class SimpleReplicationAgentTest
ResourceResolver resourceResolver = mock(ResourceResolver.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
- when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
+ when(distributionHandler.add(any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
.thenReturn(Arrays.asList(replicationPackage));
- when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
new SimpleReplicationQueue(name, "name"));
ReplicationResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
@@ -137,7 +137,7 @@ public class SimpleReplicationAgentTest
when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
when(packageExporter.exportPackages(resourceResolver, request)).thenReturn(Arrays.asList(replicationPackage));
- when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
new SimpleReplicationQueue(name, "name"));
agent.execute(resourceResolver, request);
@@ -161,7 +161,7 @@ public class SimpleReplicationAgentTest
queueProvider, distributionHandler,
replicationEventFactory, resolverFactory, null);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME))
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME))
.thenReturn(queue);
assertNotNull(agent.getQueue(null));
}
@@ -184,7 +184,7 @@ public class SimpleReplicationAgentTest
queueProvider, distributionHandler,
replicationEventFactory, resolverFactory, null);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue(name, "priority")).thenReturn(queue);
+ when(queueProvider.getQueue("priority")).thenReturn(queue);
assertNotNull(agent.getQueue("priority"));
}
@@ -206,7 +206,7 @@ public class SimpleReplicationAgentTest
queueProvider, distributionHandler,
replicationEventFactory, resolverFactory, null);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue(name, "priority")).thenReturn(queue);
+ when(queueProvider.getQueue("priority")).thenReturn(queue);
assertNull(agent.getQueue("weird"));
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java Tue Nov 4 15:44:12 2014
@@ -44,10 +44,10 @@ public class ErrorAwareQueueDistribution
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@@ -60,12 +60,12 @@ public class ErrorAwareQueueDistribution
ReplicationQueue queue = mock(ReplicationQueue.class);
ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(queueItem)).thenReturn(true);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
when(queue.getStatus(queueItem)).thenReturn(state);
- boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
assertFalse(returnedState);
}
@@ -83,17 +83,17 @@ public class ErrorAwareQueueDistribution
ReplicationQueue queue = mock(ReplicationQueue.class);
ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(queueItem)).thenReturn(true);
when(queue.getHead()).thenReturn(queueItem);
ReplicationQueue errorQueue = mock(ReplicationQueue.class);
when(errorQueue.add(queueItem)).thenReturn(true);
- when(queueProvider.getQueue("agentName", ErrorAwareQueueDistributionStrategy.ERROR_QUEUE_NAME)).thenReturn(errorQueue);
+ when(queueProvider.getQueue(ErrorAwareQueueDistributionStrategy.ERROR_QUEUE_NAME)).thenReturn(errorQueue);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
when(state.getAttempts()).thenReturn(2);
when(queue.getStatus(any(ReplicationQueueItem.class))).thenReturn(state);
- boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
assertFalse(returnedState);
}
@@ -110,14 +110,14 @@ public class ErrorAwareQueueDistribution
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
when(queue.getHead()).thenReturn(mock(ReplicationQueueItem.class));
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
when(state.getAttempts()).thenReturn(2);
when(queue.getStatus(any(ReplicationQueueItem.class))).thenReturn(state);
- boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@@ -128,9 +128,9 @@ public class ErrorAwareQueueDistribution
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@@ -140,10 +140,10 @@ public class ErrorAwareQueueDistribution
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java Tue Nov 4 15:44:12 2014
@@ -40,154 +40,122 @@ public class PriorityPathQueueDistributi
@Test
public void testPackageAdditionWithSucceedingItemDelivery() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithSucceedingItemDeliveryOnPriorityPath() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/sample1"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", "/content")).thenReturn(queue);
+ when(queueProvider.getQueue("/content")).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithFailingItemDelivery() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithFailingItemDeliveryOnPriorityPath() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/sample2"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", "/content")).thenReturn(queue);
+ when(queueProvider.getQueue("/content")).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNullItemStateFromTheQueue() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNullItemStateFromTheQueueOnPriorityPath() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps/some/stuff"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", "/apps")).thenReturn(queue);
+ when(queueProvider.getQueue("/apps")).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNotNullItemStateFromTheQueue() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNotNullItemStateFromTheQueueOnPriorityPath() throws Exception {
- PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
- ComponentContext context = mock(ComponentContext.class);
- Dictionary properties = mock(Dictionary.class);
- when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
- when(context.getProperties()).thenReturn(properties);
- priorityPathDistributionStrategy.activate(context);
+ PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", "/apps")).thenReturn(queue);
+ when(queueProvider.getQueue("/apps")).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java Tue Nov 4 15:44:12 2014
@@ -41,10 +41,10 @@ public class SingleQueueDistributionStra
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@@ -55,12 +55,12 @@ public class SingleQueueDistributionStra
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(queueItem)).thenReturn(true);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
when(queue.getStatus(queueItem)).thenReturn(state);
- boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
assertFalse(returnedState);
}
@@ -70,9 +70,9 @@ public class SingleQueueDistributionStra
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
@@ -82,10 +82,10 @@ public class SingleQueueDistributionStra
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+ when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
- boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
assertTrue(returnedState);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java Tue Nov 4 15:44:12 2014
@@ -42,9 +42,9 @@ public class JobHandlingReplicationQueue
JobManager jobManager = mock(JobManager.class);
BundleContext context = mock(BundleContext.class);
- JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
+ JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider("dummy-agent",
jobManager, context);
- ReplicationQueue queue = jobHandlingReplicationQueueProvider.getInternalQueue("dummy-agent", "default");
+ ReplicationQueue queue = jobHandlingReplicationQueueProvider.getQueue("default");
assertNotNull(queue);
}
@@ -57,11 +57,11 @@ public class JobHandlingReplicationQueue
Configuration config = mock(Configuration.class);
when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
BundleContext context = mock(BundleContext.class);
- JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
+ JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider("dummy-agent",
jobManager, context);
String agentName = "dummy-agent";
ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
- jobHandlingReplicationQueueProvider.enableQueueProcessing(agentName, queueProcessor);
+ jobHandlingReplicationQueueProvider.enableQueueProcessing(queueProcessor);
}
@Test
@@ -71,9 +71,9 @@ public class JobHandlingReplicationQueue
Configuration config = mock(Configuration.class);
when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
BundleContext context = mock(BundleContext.class);
- JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
- jobManager, context);
+ JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider("dummy-agent",
+ jobManager, context);
String agentName = "dummy-agent";
- jobHandlingReplicationQueueProvider.disableQueueProcessing(agentName);
+ jobHandlingReplicationQueueProvider.disableQueueProcessing();
}
}
\ No newline at end of file
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java Tue Nov 4 15:44:12 2014
@@ -51,7 +51,7 @@ public class ScheduledReplicationQueuePr
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queue.isEmpty()).thenReturn(true);
queues.add(queue);
- when(queueProvider.getAllQueues()).thenReturn(queues);
+ when(queueProvider.getQueues()).thenReturn(queues);
ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
ScheduledReplicationQueueProcessorTask scheduledReplicationQueueProcessorTask = new ScheduledReplicationQueueProcessorTask(
queueProvider, queueProcessor);
@@ -68,7 +68,7 @@ public class ScheduledReplicationQueuePr
when(queue.getHead()).thenReturn(item);
queues.add(queue);
- when(queueProvider.getAllQueues()).thenReturn(queues);
+ when(queueProvider.getQueues()).thenReturn(queues);
ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
ScheduledReplicationQueueProcessorTask scheduledReplicationQueueProcessorTask = new ScheduledReplicationQueueProcessorTask(
queueProvider, queueProcessor);