You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2014/11/04 16:44:13 UTC

svn commit: r1636613 [1/2] - in /sling/trunk/contrib/extensions/replication/core/src: main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/component/ main/java/org/apache/sling/replication/component/impl/ main/java/...

Author: mpetria
Date: Tue Nov  4 15:44:12 2014
New Revision: 1636613

URL: http://svn.apache.org/r1636613
Log:
SLING-3965: refactor queue provider api to have single agent providers

Removed:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
Modified:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java Tue Nov  4 15:44:12 2014
@@ -36,6 +36,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.component.ManagedReplicationComponent;
 import org.apache.sling.replication.component.ReplicationComponent;
@@ -66,13 +67,8 @@ import org.slf4j.LoggerFactory;
 )
 public class CoordinatingReplicationAgentFactory implements ReplicationComponentProvider {
 
-    private static final String QUEUE_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER + ".target";
-    private static final String QUEUE_DISTRIBUTION_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY + ".target";
     private static final String TRANSPORT_AUTHENTICATION_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_TRANSPORT_AUTHENTICATION_PROVIDER + ".target";
 
-    private static final String DEFAULT_QUEUEPROVIDER = "(name=" + JobHandlingReplicationQueueProvider.NAME + ")";
-    private static final String DEFAULT_DISTRIBUTION = "(name=" + SingleQueueDistributionStrategy.NAME + ")";
-
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -97,14 +93,6 @@ public class CoordinatingReplicationAgen
     @Property(label = "Package Importer", cardinality = 100)
     public static final String PACKAGE_IMPORTER = ReplicationComponentFactory.COMPONENT_PACKAGE_IMPORTER;
 
-    @Property(label = "Target ReplicationQueueProvider", name = QUEUE_PROVIDER_TARGET, value = DEFAULT_QUEUEPROVIDER)
-    @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER, target = DEFAULT_QUEUEPROVIDER)
-    private volatile ReplicationQueueProvider queueProvider;
-
-    @Property(label = "Target QueueDistributionStrategy", name = QUEUE_DISTRIBUTION_TARGET, value = DEFAULT_DISTRIBUTION)
-    @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY, target = DEFAULT_DISTRIBUTION)
-    private volatile ReplicationQueueDistributionStrategy queueDistributionStrategy;
-
     @Property(label = "Target TransportAuthenticationProvider", name = TRANSPORT_AUTHENTICATION_PROVIDER_TARGET)
     @Reference(name = "transportAuthenticationProvider")
     private volatile TransportAuthenticationProvider transportAuthenticationProvider;
@@ -116,13 +104,21 @@ public class CoordinatingReplicationAgen
     private SlingSettingsService settingsService;
 
     @Reference
+    private JobManager jobManager;
+
+    @Reference
     private ReplicationComponentFactory componentFactory;
 
+    private BundleContext savedContext;
+
     private ServiceRegistration componentReg;
 
+    private String agentName;
+
     @Activate
     protected void activate(BundleContext context, Map<String, Object> config) {
 
+        savedContext = context;
         // inject configuration
         Dictionary<String, Object> props = new Hashtable<String, Object>();
 
@@ -131,9 +127,8 @@ public class CoordinatingReplicationAgen
         if (enabled) {
             props.put(ENABLED, true);
 
-            String name = PropertiesUtil
-                    .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
-            props.put(NAME, name);
+            agentName = PropertiesUtil.toString(config.get(NAME), null);
+            props.put(NAME, agentName);
 
             if (componentReg == null) {
                 Map<String, Object> properties = new HashMap<String, Object>();
@@ -159,14 +154,11 @@ public class CoordinatingReplicationAgen
                 packageExporterProperties = packageExporterPropertiesList.toArray(new String[packageExporterPropertiesList.size()]);
                 properties.put(PACKAGE_EXPORTER, packageExporterProperties);
 
-                properties.put("trigger0", new String[]{"type=scheduledEvent"});
-
                 ReplicationAgent agent = componentFactory.createComponent(ReplicationAgent.class, properties, this);
 
-                log.debug("activated agent {}", name);
+                log.debug("activated agent {}", agentName);
 
                 if (agent != null) {
-
                     // register agent service
                     componentReg = context.registerService(ReplicationAgent.class.getName(), agent, props);
                     if (agent instanceof ManagedReplicationComponent) {
@@ -196,9 +188,9 @@ public class CoordinatingReplicationAgen
     public <ComponentType extends ReplicationComponent> ComponentType getComponent(@Nonnull Class<ComponentType> type,
                                                                                    @Nullable String componentName) {
         if (type.isAssignableFrom(ReplicationQueueProvider.class)) {
-            return (ComponentType) queueProvider;
+            return (ComponentType) new JobHandlingReplicationQueueProvider(agentName, jobManager, savedContext);
         } else if (type.isAssignableFrom(ReplicationQueueDistributionStrategy.class)) {
-            return (ComponentType) queueDistributionStrategy;
+            return (ComponentType) new SingleQueueDistributionStrategy();
         } else if (type.isAssignableFrom(TransportAuthenticationProvider.class)) {
             return (ComponentType) transportAuthenticationProvider;
         }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Tue Nov  4 15:44:12 2014
@@ -172,7 +172,7 @@ public class SimpleReplicationAgent impl
 
         // dispatch the replication package to the queue distribution handler
         try {
-            boolean success = queueDistributionStrategy.add(name, replicationPackage, queueProvider);
+            boolean success = queueDistributionStrategy.add(replicationPackage, queueProvider);
 
             Dictionary<Object, Object> properties = new Properties();
             properties.put("replication.package.paths", replicationPackage.getPaths());
@@ -197,9 +197,9 @@ public class SimpleReplicationAgent impl
         ReplicationQueue queue;
         try {
             if (queueName != null && queueName.length() > 0) {
-                queue = queueProvider.getQueue(this.name, queueName);
+                queue = queueProvider.getQueue(queueName);
             } else {
-                queue = queueProvider.getQueue(this.name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME);
+                queue = queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME);
             }
         } catch (ReplicationQueueException e) {
             throw new ReplicationAgentException(e);
@@ -223,7 +223,11 @@ public class SimpleReplicationAgent impl
         }
 
         if (!isPassive()) {
-            queueProvider.enableQueueProcessing(name, new PackageQueueProcessor());
+            try {
+                queueProvider.enableQueueProcessing(new PackageQueueProcessor());
+            } catch (ReplicationQueueException e) {
+                log.error("cannot enable queue processing", e);
+            }
         }
     }
 
@@ -241,7 +245,12 @@ public class SimpleReplicationAgent impl
         agentBasedRequestHandler = null;
 
         if (!isPassive()) {
-            queueProvider.disableQueueProcessing(name);
+
+            try {
+                queueProvider.disableQueueProcessing();
+            } catch (ReplicationQueueException e) {
+                log.error("cannot disable queue processing", e);
+            }
         }
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentFactory.java Tue Nov  4 15:44:12 2014
@@ -34,6 +34,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.component.ManagedReplicationComponent;
 import org.apache.sling.replication.component.ReplicationComponent;
@@ -63,12 +64,8 @@ import org.slf4j.LoggerFactory;
         policy = ConfigurationPolicy.REQUIRE
 )
 public class SimpleReplicationAgentFactory implements ReplicationComponentProvider {
-    private static final String QUEUE_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER + ".target";
-    private static final String QUEUE_DISTRIBUTION_TARGET = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY + ".target";
-    private static final String TRANSPORT_AUTHENTICATION_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_TRANSPORT_AUTHENTICATION_PROVIDER + ".target";
 
-    private static final String DEFAULT_QUEUEPROVIDER = "(name=" + JobHandlingReplicationQueueProvider.NAME + ")";
-    private static final String DEFAULT_DISTRIBUTION = "(name=" + SingleQueueDistributionStrategy.NAME + ")";
+    private static final String TRANSPORT_AUTHENTICATION_PROVIDER_TARGET = ReplicationComponentFactory.COMPONENT_TRANSPORT_AUTHENTICATION_PROVIDER + ".target";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -96,14 +93,6 @@ public class SimpleReplicationAgentFacto
     @Property(label = "Service Name")
     public static final String SERVICE_NAME = ReplicationComponentFactory.AGENT_SIMPLE_PROPERTY_SERVICE_NAME;
 
-    @Property(label = "Target ReplicationQueueProvider", name = QUEUE_PROVIDER_TARGET, value = DEFAULT_QUEUEPROVIDER)
-    @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_PROVIDER, target = DEFAULT_QUEUEPROVIDER)
-    private volatile ReplicationQueueProvider queueProvider;
-
-    @Property(label = "Target QueueDistributionStrategy", name = QUEUE_DISTRIBUTION_TARGET, value = DEFAULT_DISTRIBUTION)
-    @Reference(name = ReplicationComponentFactory.COMPONENT_QUEUE_DISTRIBUTION_STRATEGY, target = DEFAULT_DISTRIBUTION)
-    private volatile ReplicationQueueDistributionStrategy queueDistributionStrategy;
-
     @Property(label = "Target TransportAuthenticationProvider", name = TRANSPORT_AUTHENTICATION_PROVIDER_TARGET)
     @Reference(name = "transportAuthenticationProvider", policy = ReferencePolicy.DYNAMIC,
             cardinality = ReferenceCardinality.OPTIONAL_UNARY)
@@ -116,11 +105,15 @@ public class SimpleReplicationAgentFacto
     private SlingSettingsService settingsService;
 
     @Reference
+    private JobManager jobManager;
+
+    @Reference
     private ReplicationComponentFactory componentFactory;
 
     private ServiceRegistration componentReg;
     private BundleContext savedContext;
     private Map<String, Object> savedConfig;
+    private String agentName;
 
     @Activate
     protected void activate(BundleContext context, Map<String, Object> config) {
@@ -136,14 +129,13 @@ public class SimpleReplicationAgentFacto
         if (enabled) {
             props.put(ENABLED, true);
 
-            String name = PropertiesUtil
-                    .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
-            props.put(NAME, name);
+            agentName = PropertiesUtil.toString(config.get(NAME), null);
+            props.put(NAME, agentName);
 
             if (componentReg == null && componentFactory != null) {
                 ReplicationAgent agent = componentFactory.createComponent(ReplicationAgent.class, config, this);
 
-                log.debug("activated agent {}", name);
+                log.debug("activated agent {}", agentName);
 
                 if (agent != null) {
 
@@ -176,10 +168,10 @@ public class SimpleReplicationAgentFacto
     public <ComponentType extends ReplicationComponent> ComponentType getComponent(@Nonnull Class<ComponentType> type,
                                                                                    @Nullable String componentName) {
         if (type.isAssignableFrom(ReplicationQueueProvider.class)) {
-            return (ComponentType) queueProvider;
+            return (ComponentType) new JobHandlingReplicationQueueProvider(agentName, jobManager, savedContext);
         }
         else if (type.isAssignableFrom(ReplicationQueueDistributionStrategy.class)) {
-            return (ComponentType) queueDistributionStrategy;
+            return (ComponentType) new SingleQueueDistributionStrategy();
         }
         else if (type.isAssignableFrom(TransportAuthenticationProvider.class)) {
             return (ComponentType) transportAuthenticationProvider;

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/ReplicationComponentFactory.java Tue Nov  4 15:44:12 2014
@@ -289,6 +289,35 @@ public interface ReplicationComponentFac
 
 
     /**
+     * queue provider job type
+     */
+    String QUEUE_PROVIDER_JOB = "job";
+
+    /**
+     * queue provider simple type
+     */
+    String QUEUE_PROVIDER_SIMPLE = "simple";
+
+    /**
+     * queue distribution strategy single type
+     */
+    String QUEUE_DISTRIBUTION_STRATEGY_SINGLE = "single";
+
+    /**
+     * queue distribution strategy priority type
+     */
+    String QUEUE_DISTRIBUTION_STRATEGY_PRIORITY = "priority";
+
+    /**
+     * queue distribution strategy priority paths property
+     */
+    String QUEUE_DISTRIBUTION_STRATEGY_PRIORITY_PROPERTY_PATHS = "priority.paths";
+
+
+
+
+
+    /**
      * create a {@link ReplicationComponent}
      *
      * @param type              the {@link java.lang.Class} of the component to be created

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/component/impl/DefaultReplicationComponentFactory.java Tue Nov  4 15:44:12 2014
@@ -34,6 +34,7 @@ import org.apache.jackrabbit.vault.packa
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.jcr.api.SlingRepository;
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.agent.ReplicationRequestAuthorizationStrategy;
@@ -53,6 +54,11 @@ import org.apache.sling.replication.pack
 import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporter;
 import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.impl.ErrorAwareQueueDistributionStrategy;
+import org.apache.sling.replication.queue.impl.PriorityPathDistributionStrategy;
+import org.apache.sling.replication.queue.impl.SingleQueueDistributionStrategy;
+import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider;
+import org.apache.sling.replication.queue.impl.simple.SimpleReplicationQueueProvider;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.impl.ResourceSharedReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
@@ -103,6 +109,9 @@ public class DefaultReplicationComponent
     @Reference
     private Scheduler scheduler;
 
+    @Reference
+    private JobManager jobManager;
+
     private BundleContext bundleContext;
 
     @Activate
@@ -262,6 +271,15 @@ public class DefaultReplicationComponent
             String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
             return componentProvider.getComponent(ReplicationQueueProvider.class, name);
         }
+        else if (QUEUE_PROVIDER_JOB.equals(factory)) {
+            String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
+            return new JobHandlingReplicationQueueProvider(name, jobManager, bundleContext);
+        }
+        else if (QUEUE_PROVIDER_SIMPLE.equals(factory)) {
+            String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
+
+            return new SimpleReplicationQueueProvider(scheduler, name);
+        }
 
         return null;
     }
@@ -272,7 +290,14 @@ public class DefaultReplicationComponent
         if (COMPONENT_TYPE_SERVICE.equals(factory)) {
             String name = PropertiesUtil.toString(properties.get(COMPONENT_NAME), null);
             return componentProvider.getComponent(ReplicationQueueDistributionStrategy.class, name);
+        }
+        else if (QUEUE_DISTRIBUTION_STRATEGY_SINGLE.equals(factory)) {
+            return new SingleQueueDistributionStrategy();
+        }
+        else if (QUEUE_DISTRIBUTION_STRATEGY_PRIORITY.equals(factory)) {
+            String[] priorityPaths = PropertiesUtil.toStringArray(properties.get(QUEUE_DISTRIBUTION_STRATEGY_PRIORITY_PROPERTY_PATHS), null);
 
+            return new PriorityPathDistributionStrategy(priorityPaths);
         }
 
         return null;

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Tue Nov  4 15:44:12 2014
@@ -39,15 +39,13 @@ public interface ReplicationQueueDistrib
      * synchronously distribute a {@link org.apache.sling.replication.packaging.ReplicationPackage}
      * to one or more {@link ReplicationQueue}s provided by the given {@link ReplicationQueueProvider}
      *
-     * @param agentName     the name of a {@link ReplicationAgent}
      * @param replicationPackage          a {@link org.apache.sling.replication.packaging.ReplicationPackage} to distribute
      * @param queueProvider the {@link ReplicationQueueProvider} used to provide the queues to be used for the given package
      * @return a {@link ReplicationQueueItemState} representing the state of the package in the queue after its distribution
      * @throws ReplicationQueueException if distribution fails
      */
     @Nonnull
-    boolean add(@Nonnull String agentName, @Nonnull ReplicationPackage replicationPackage,
-                                  @Nonnull ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+    boolean add(@Nonnull ReplicationPackage replicationPackage, @Nonnull ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
 
 
     /**

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Tue Nov  4 15:44:12 2014
@@ -33,30 +33,26 @@ public interface ReplicationQueueProvide
     /**
      * provide a named queue for the given agent
      *
-     * @param agentName the replication agent needing the queue
      * @param queueName      the name of the queue to retrieve
      * @return a replication queue to be used for the given parameters
      * @throws ReplicationQueueException
      */
     @Nonnull
-    ReplicationQueue getQueue(@Nonnull String agentName, @Nonnull String queueName)
-            throws ReplicationQueueException;
+    ReplicationQueue getQueue(@Nonnull String queueName) throws ReplicationQueueException;
 
 
 
     /**
      * enables queue driven processing for an agent
      *
-     * @param agentName      a replication agent
      * @param queueProcessor the queue processor to be used
      */
-    void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor);
+    void enableQueueProcessing(@Nonnull ReplicationQueueProcessor queueProcessor) throws ReplicationQueueException;
 
 
     /**
      * disables queue driven processing for an agent
      *
-     * @param agentName a replication agent
      */
-    void disableQueueProcessing(@Nonnull String agentName);
+    void disableQueueProcessing() throws ReplicationQueueException;
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Tue Nov  4 15:44:12 2014
@@ -46,9 +46,6 @@ import org.slf4j.LoggerFactory;
  * delivering packages with an error queue which can be used when an item is stuck in the default
  * queue for too much time, then the stuck item is moved to the error queue or dropped.
  */
-@Component(immediate = true, metatype = true, label = "Error Aware Queue Distribution Strategy")
-@Service(value = ReplicationQueueDistributionStrategy.class)
-@Property(name = "name", value = ErrorAwareQueueDistributionStrategy.NAME, propertyPrivate = true)
 public class ErrorAwareQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
 
     protected static final String ERROR_QUEUE_NAME = "error";
@@ -76,6 +73,7 @@ public class ErrorAwareQueueDistribution
 
     private Integer timeThreshold;
 
+
     @Activate
     protected void activate(final ComponentContext ctx) {
         stuckQueueHandling = PropertiesUtil
@@ -84,13 +82,12 @@ public class ErrorAwareQueueDistribution
         timeThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(TIME_THRESHOLD), 600000);
     }
 
-    public boolean add(String agentName, ReplicationPackage replicationPackage,
-                         ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+    public boolean add(ReplicationPackage replicationPackage, ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         boolean added;
         ReplicationQueueItem queueItem = getItem(replicationPackage);
-        ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
+        ReplicationQueue queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
         added = queue.add(queueItem);
-        checkAndRemoveStuckItems(agentName, queueProvider);
+        checkAndRemoveStuckItems(queueProvider);
         return added;
     }
 
@@ -98,9 +95,8 @@ public class ErrorAwareQueueDistribution
         return Arrays.asList(new String[] { ERROR_QUEUE_NAME, DEFAULT_QUEUE_NAME });
     }
 
-    private void checkAndRemoveStuckItems(String agent,
-                                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
-        ReplicationQueue defaultQueue = queueProvider.getQueue(agent, DEFAULT_QUEUE_NAME);
+    private void checkAndRemoveStuckItems(ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue defaultQueue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
         // get first item in the queue with its status
         ReplicationQueueItem firstItem = defaultQueue.getHead();
         if (firstItem != null) {
@@ -113,7 +109,7 @@ public class ErrorAwareQueueDistribution
                 if (ERROR.equals(stuckQueueHandling)) {
                     log.warn("item {} moved to the error queue", firstItem);
 
-                    ReplicationQueue errorQueue = queueProvider.getQueue(agent, ERROR_QUEUE_NAME);
+                    ReplicationQueue errorQueue = queueProvider.getQueue(ERROR_QUEUE_NAME);
                     if (!errorQueue.add(firstItem)) {
                         log.error("failed to move item {} the queue {}", firstItem, errorQueue);
                         throw new ReplicationQueueException("could not move an item to the error queue");

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Tue Nov  4 15:44:12 2014
@@ -43,29 +43,21 @@ import org.slf4j.LoggerFactory;
  * Distribution algorithm which keeps one specific queue to handle specific paths and another queue
  * for handling all the other paths
  */
-@Component(immediate = true, metatype = true, label = "Priority Path Queue Distribution Strategy")
-@Service(value = ReplicationQueueDistributionStrategy.class)
-@Property(name = "name", value = PriorityPathDistributionStrategy.NAME, propertyPrivate = true)
 public class PriorityPathDistributionStrategy implements ReplicationQueueDistributionStrategy {
 
-    public static final String NAME = "priority";
-
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    @Property(value = {"/content"})
-    private static final String PRIORITYPATHS = "priority.paths";
+    private final String[] priorityPaths;
 
-    private String[] priorityPaths;
+    public PriorityPathDistributionStrategy(String[] priorityPaths) {
+        this.priorityPaths = priorityPaths;
 
-    @Activate
-    protected void activate(ComponentContext context) {
-        priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
     }
 
 
 
-    private ReplicationQueue getQueue(String agentName, ReplicationQueueItem replicationPackage,
-                                      ReplicationQueueProvider queueProvider)
+
+    private ReplicationQueue getQueue(ReplicationQueueItem replicationPackage, ReplicationQueueProvider queueProvider)
             throws ReplicationQueueException {
         String[] paths = replicationPackage.getPaths();
 
@@ -86,23 +78,22 @@ public class PriorityPathDistributionStr
         ReplicationQueue queue;
         if (usePriorityQueue) {
             log.info("using priority queue for path {}", pp);
-            queue = queueProvider.getQueue(agentName, pp);
+            queue = queueProvider.getQueue(pp);
         } else {
             log.info("using default queue");
-            queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
+            queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
         }
         return queue;
     }
 
-    public boolean add(String agentName, ReplicationPackage replicationPackage,
-                         ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+    public boolean add(ReplicationPackage replicationPackage, ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
 
         ReplicationQueueItem queueItem = getItem(replicationPackage);
-        ReplicationQueue queue = getQueue(agentName, queueItem, queueProvider);
+        ReplicationQueue queue = getQueue(queueItem, queueProvider);
         if (queue != null) {
             return queue.add(queueItem);
         } else {
-            throw new ReplicationQueueException("could not get a queue for agent " + agentName);
+            throw new ReplicationQueueException("could not get a queue");
         }
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Tue Nov  4 15:44:12 2014
@@ -42,19 +42,13 @@ import java.util.List;
  * The default strategy for delivering packages to queues. Each agent just manages a single queue,
  * no failure / stuck handling where each package is put regardless of anything.
  */
-@Component(immediate = true, label = "Single Queue Distribution Strategy")
-@Service(value = ReplicationQueueDistributionStrategy.class)
-@Property(name = "name", value = SingleQueueDistributionStrategy.NAME)
 public class SingleQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
 
-    public static final String NAME = "single";
-
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    public boolean add(String agentName, ReplicationPackage replicationPackage,
-                         ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+    public boolean add(ReplicationPackage replicationPackage, ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         ReplicationQueueItem queueItem = getItem(replicationPackage);
-        ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
+        ReplicationQueue queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
         return queue.add(queueItem);
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Tue Nov  4 15:44:12 2014
@@ -36,89 +36,66 @@ import org.apache.sling.replication.queu
 import org.apache.sling.replication.queue.ReplicationQueueException;
 import org.apache.sling.replication.queue.ReplicationQueueProcessor;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(metatype = false, label = "Sling Job handling based Replication Queue Provider")
-@Service(value = ReplicationQueueProvider.class)
-@Property(name = "name", value = JobHandlingReplicationQueueProvider.NAME)
-public class JobHandlingReplicationQueueProvider extends AbstractReplicationQueueProvider
-        implements ReplicationQueueProvider {
-
-    public static final String NAME = "sjh";
-
+/**
+ * a queue provider {@link ReplicationQueueProvider} for sling jobs based
+ * {@link ReplicationQueue}s
+ */
+public class JobHandlingReplicationQueueProvider implements ReplicationQueueProvider {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    @Reference
-    private JobManager jobManager;
+    private final String name;
+    private final JobManager jobManager;
 
 
-    private final Map<String, ServiceRegistration> jobConsumers = new ConcurrentHashMap<String, ServiceRegistration>();
+
+    private ServiceRegistration jobConsumer = null;
 
     private BundleContext context;
 
 
-    protected JobHandlingReplicationQueueProvider(JobManager jobManager, BundleContext context) {
+    public JobHandlingReplicationQueueProvider(String name, JobManager jobManager, BundleContext context) {
+        if (name == null || jobManager == null || context == null) {
+            throw new IllegalArgumentException("all arguments are required");
+        }
+        this.name = name;
         this.jobManager = jobManager;
         this.context = context;
     }
 
-    public JobHandlingReplicationQueueProvider() {
-    }
-
-    @Override
-    protected ReplicationQueue getInternalQueue(String agentName, String queueName)
-            throws ReplicationQueueException {
-        String name = agentName;
-        if (queueName.length() > 0) {
-            name += "/" + queueName;
-        }
-        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
+    public ReplicationQueue getQueue(String queueName) throws ReplicationQueueException {
+        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name + "/" + queueName;
         return new JobHandlingReplicationQueue(name, topic, jobManager);
     }
 
 
-    public void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor) {
+    public void enableQueueProcessing(@Nonnull ReplicationQueueProcessor queueProcessor) throws ReplicationQueueException {
+        if (jobConsumer != null) {
+            throw new ReplicationQueueException("job already registered");
+        }
         // eventually register job consumer for sling job handling based queues
         Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
-        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agentName;
+        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
         String childTopic = topic + "/*";
         jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
-        synchronized (jobConsumers) {
-            log.info("registering job consumer for agent {}", agentName);
-            ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
-                    new ReplicationAgentJobConsumer(queueProcessor), jobProps);
-            if (jobReg != null) {
-                jobConsumers.put(agentName, jobReg);
-            }
-            log.info("job consumer for agent {} registered", agentName);
-        }
+        log.info("registering job consumer for agent {}", name);
+        jobConsumer = context.registerService(JobConsumer.class.getName(), new ReplicationAgentJobConsumer(queueProcessor), jobProps);
+        log.info("job consumer for agent {} registered", name);
     }
 
-    public void disableQueueProcessing(@Nonnull String agentName) {
-        synchronized (jobConsumers) {
-            log.info("unregistering job consumer for agent {}", agentName);
-            ServiceRegistration jobReg = jobConsumers.remove(agentName);
-            if (jobReg != null) {
-                jobReg.unregister();
-                log.info("job consumer for agent {} unregistered", agentName);
-            }
+    public void disableQueueProcessing() {
+        if (jobConsumer != null) {
+            jobConsumer.unregister();
+            log.info("job consumer for agent {} unregistered", name);
+            jobConsumer = null;
         }
-    }
+        log.info("unregistering job consumer for agent {}", name);
 
-    @Activate
-    private void activate(BundleContext context) {
-        this.context = context;
     }
 
-    @Deactivate
-    private void deactivate() {
-        for (ServiceRegistration jobReg : jobConsumers.values()) {
-            jobReg.unregister();
-        }
-        this.context = null;
-    }
+
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java Tue Nov  4 15:44:12 2014
@@ -44,7 +44,7 @@ public class ScheduledReplicationQueuePr
 
     public void run() {
         try {
-            for (ReplicationQueue queue : queueProvider.getAllQueues()) {
+            for (ReplicationQueue queue : queueProvider.getQueues()) {
                 while (!queue.isEmpty()) {
                     // synchronized (queue) {
                     ReplicationQueueItem item = queue.getHead();

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Tue Nov  4 15:44:12 2014
@@ -20,61 +20,77 @@ package org.apache.sling.replication.que
 
 import javax.annotation.Nonnull;
 
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueException;
 import org.apache.sling.replication.queue.ReplicationQueueProcessor;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * an OSGi service implementing {@link ReplicationQueueProvider} for simple in memory
+ * a queue provider {@link ReplicationQueueProvider} for simple in memory
  * {@link ReplicationQueue}s
  */
-@Component(metatype = false, label = "In memory Replication Queue Provider")
-@Service(value = ReplicationQueueProvider.class)
-@Property(name = "name", value = SimpleReplicationQueueProvider.NAME)
-public class SimpleReplicationQueueProvider extends AbstractReplicationQueueProvider implements
-        ReplicationQueueProvider {
+public class SimpleReplicationQueueProvider  implements ReplicationQueueProvider {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+    private final String name;
+    private final Scheduler scheduler;
 
-    @Reference
-    Scheduler scheduler;
+    private final Map<String, ReplicationQueue> queueMap = new ConcurrentHashMap<String, ReplicationQueue>();
 
-    public static final String NAME = "simple";
 
-    protected SimpleReplicationQueueProvider(Scheduler scheduler) {
+    public SimpleReplicationQueueProvider(Scheduler scheduler, String name) {
+        if (name == null || scheduler == null) {
+            throw new IllegalArgumentException("all arguments are required");
+        }
+
         this.scheduler = scheduler;
-    }
+        this.name = name;
 
-    public SimpleReplicationQueueProvider() {
     }
 
-    protected ReplicationQueue getInternalQueue(String agentName, String selector)
+    @Nonnull
+    public ReplicationQueue getQueue(@Nonnull String queueName)
             throws ReplicationQueueException {
-        return new SimpleReplicationQueue(agentName, selector);
+        String key = name + queueName;
+
+        ReplicationQueue queue = queueMap.get(key);
+        if (queue == null) {
+            log.info("creating a queue with key {}", key);
+            queue = new SimpleReplicationQueue(name, queueName);
+            queueMap.put(key, queue);
+            log.info("queue created {}", queue);
+        }
+        return queue;
     }
 
-    protected void deleteQueue(ReplicationQueue queue) {
-        // do nothing as queues just exist in the cache
+
+    protected Collection<ReplicationQueue> getQueues() {
+        return queueMap.values();
     }
 
-    public void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor) {
+    public void enableQueueProcessing(@Nonnull ReplicationQueueProcessor queueProcessor) {
         ScheduleOptions options = scheduler.NOW(-1, 10)
                 .canRunConcurrently(false)
-                .name(getJobName(agentName));
+                .name(getJobName());
         scheduler.schedule(new ScheduledReplicationQueueProcessorTask(this, queueProcessor), options);
     }
 
-    public void disableQueueProcessing(@Nonnull String agentName) {
-        scheduler.unschedule(getJobName(agentName));
+    public void disableQueueProcessing() {
+        scheduler.unschedule(getJobName());
     }
 
-    private String getJobName(String agentName) {
-        return SimpleReplicationQueueProvider.NAME + "-queueProcessor-" + agentName;
+    private String getJobName() {
+        return "simple-queueProcessor-" + name;
     }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactoryTest.java Tue Nov  4 15:44:12 2014
@@ -42,6 +42,7 @@ public class CoordinatingReplicationAgen
         BundleContext context = mock(BundleContext.class);
         Map<String, Object> config = new HashMap<String, Object>();
         try {
+            config.put(CoordinatingReplicationAgentFactory.NAME, "agentName");
             coordinatingReplicationAgentFactory.activate(context, config);
             fail("cannot activate a coordinate agents without exporters/importers");
         } catch (IllegalArgumentException e) {
@@ -54,6 +55,7 @@ public class CoordinatingReplicationAgen
         CoordinatingReplicationAgentFactory coordinatingReplicationAgentFactory = new CoordinatingReplicationAgentFactory();
         BundleContext context = mock(BundleContext.class);
         Map<String, Object> config = new HashMap<String, Object>();
+        config.put(CoordinatingReplicationAgentFactory.NAME, "agentName");
         config.put(CoordinatingReplicationAgentFactory.PACKAGE_IMPORTER, new String[]{});
         config.put(CoordinatingReplicationAgentFactory.PACKAGE_EXPORTER, new String[]{});
         try {
@@ -70,6 +72,8 @@ public class CoordinatingReplicationAgen
 
         BundleContext context = mock(BundleContext.class);
         Map<String, Object> config = new HashMap<String, Object>();
+
+        config.put(CoordinatingReplicationAgentFactory.NAME, "agentName");
         config.put(CoordinatingReplicationAgentFactory.PACKAGE_IMPORTER, new String[]{"packageBuilder/type=vlt",
                 "endpoints[0]=http://host:101/libs/sling/replication/services/exporters/reverse-101",
                 "endpoints[1]=http://host:102/libs/sling/replication/services/exporters/reverse-102",

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Tue Nov  4 15:44:12 2014
@@ -59,7 +59,7 @@ public class SimpleReplicationAgentTest 
         ReplicationRequestAuthorizationStrategy packageExporterStrategy = mock(ReplicationRequestAuthorizationStrategy.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), any(ReplicationQueueProvider.class))).thenReturn(false);
+        when(distributionHandler.add(any(ReplicationPackage.class), any(ReplicationQueueProvider.class))).thenReturn(false);
         ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
         ResourceResolverFactory resolverFactory = mock(ResourceResolverFactory.class);
 
@@ -76,7 +76,7 @@ public class SimpleReplicationAgentTest 
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
         when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
                 .thenReturn(Arrays.asList(replicationPackage));
-        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
                 new SimpleReplicationQueue(name, "name"));
         ReplicationResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
@@ -104,10 +104,10 @@ public class SimpleReplicationAgentTest 
         ResourceResolver resourceResolver = mock(ResourceResolver.class);
 
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
-        when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
+        when(distributionHandler.add(any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
         when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
                 .thenReturn(Arrays.asList(replicationPackage));
-        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
                 new SimpleReplicationQueue(name, "name"));
         ReplicationResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
@@ -137,7 +137,7 @@ public class SimpleReplicationAgentTest 
 
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
         when(packageExporter.exportPackages(resourceResolver, request)).thenReturn(Arrays.asList(replicationPackage));
-        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
                 new SimpleReplicationQueue(name, "name"));
 
         agent.execute(resourceResolver, request);
@@ -161,7 +161,7 @@ public class SimpleReplicationAgentTest 
                 queueProvider, distributionHandler,
                 replicationEventFactory, resolverFactory, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME))
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME))
                 .thenReturn(queue);
         assertNotNull(agent.getQueue(null));
     }
@@ -184,7 +184,7 @@ public class SimpleReplicationAgentTest 
                 queueProvider, distributionHandler,
                 replicationEventFactory, resolverFactory, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue(name, "priority")).thenReturn(queue);
+        when(queueProvider.getQueue("priority")).thenReturn(queue);
         assertNotNull(agent.getQueue("priority"));
     }
 
@@ -206,7 +206,7 @@ public class SimpleReplicationAgentTest 
                 queueProvider, distributionHandler,
                 replicationEventFactory, resolverFactory, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue(name, "priority")).thenReturn(queue);
+        when(queueProvider.getQueue("priority")).thenReturn(queue);
         assertNull(agent.getQueue("weird"));
     }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java Tue Nov  4 15:44:12 2014
@@ -44,10 +44,10 @@ public class ErrorAwareQueueDistribution
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
 
         assertTrue(returnedState);
     }
@@ -60,12 +60,12 @@ public class ErrorAwareQueueDistribution
         ReplicationQueue queue = mock(ReplicationQueue.class);
         ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
 
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(queueItem)).thenReturn(true);
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
         when(state.isSuccessful()).thenReturn(false);
         when(queue.getStatus(queueItem)).thenReturn(state);
-        boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
         assertFalse(returnedState);
     }
 
@@ -83,17 +83,17 @@ public class ErrorAwareQueueDistribution
         ReplicationQueue queue = mock(ReplicationQueue.class);
         ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
 
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(queueItem)).thenReturn(true);
         when(queue.getHead()).thenReturn(queueItem);
         ReplicationQueue errorQueue = mock(ReplicationQueue.class);
         when(errorQueue.add(queueItem)).thenReturn(true);
-        when(queueProvider.getQueue("agentName", ErrorAwareQueueDistributionStrategy.ERROR_QUEUE_NAME)).thenReturn(errorQueue);
+        when(queueProvider.getQueue(ErrorAwareQueueDistributionStrategy.ERROR_QUEUE_NAME)).thenReturn(errorQueue);
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
         when(state.isSuccessful()).thenReturn(false);
         when(state.getAttempts()).thenReturn(2);
         when(queue.getStatus(any(ReplicationQueueItem.class))).thenReturn(state);
-        boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
         assertFalse(returnedState);
     }
 
@@ -110,14 +110,14 @@ public class ErrorAwareQueueDistribution
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
         when(queue.getHead()).thenReturn(mock(ReplicationQueueItem.class));
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
         when(state.isSuccessful()).thenReturn(false);
         when(state.getAttempts()).thenReturn(2);
         when(queue.getStatus(any(ReplicationQueueItem.class))).thenReturn(state);
-        boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
@@ -128,9 +128,9 @@ public class ErrorAwareQueueDistribution
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
-        boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
@@ -140,10 +140,10 @@ public class ErrorAwareQueueDistribution
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = errorAwareDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java Tue Nov  4 15:44:12 2014
@@ -40,154 +40,122 @@ public class PriorityPathQueueDistributi
 
     @Test
     public void testPackageAdditionWithSucceedingItemDelivery() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithSucceedingItemDeliveryOnPriorityPath() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/sample1"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", "/content")).thenReturn(queue);
+        when(queueProvider.getQueue("/content")).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithFailingItemDelivery() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithFailingItemDeliveryOnPriorityPath() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/sample2"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getQueue("agentName", "/content")).thenReturn(queue);
+        when(queueProvider.getQueue("/content")).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithNullItemStateFromTheQueue() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithNullItemStateFromTheQueueOnPriorityPath() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps/some/stuff"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getQueue("agentName", "/apps")).thenReturn(queue);
+        when(queueProvider.getQueue("/apps")).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithNotNullItemStateFromTheQueue() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
     @Test
     public void testPackageAdditionWithNotNullItemStateFromTheQueueOnPriorityPath() throws Exception {
-        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy();
-        ComponentContext context = mock(ComponentContext.class);
-        Dictionary properties = mock(Dictionary.class);
-        when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
-        when(context.getProperties()).thenReturn(properties);
-        priorityPathDistributionStrategy.activate(context);
+        PriorityPathDistributionStrategy priorityPathDistributionStrategy = new PriorityPathDistributionStrategy(new String[]{"/content", "/apps"});
+
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getQueue("agentName", "/apps")).thenReturn(queue);
+        when(queueProvider.getQueue("/apps")).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = priorityPathDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java Tue Nov  4 15:44:12 2014
@@ -41,10 +41,10 @@ public class SingleQueueDistributionStra
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
@@ -55,12 +55,12 @@ public class SingleQueueDistributionStra
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(queueItem)).thenReturn(true);
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
         when(state.isSuccessful()).thenReturn(false);
         when(queue.getStatus(queueItem)).thenReturn(state);
-        boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
         assertFalse(returnedState);
     }
 
@@ -70,9 +70,9 @@ public class SingleQueueDistributionStra
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
-        boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 
@@ -82,10 +82,10 @@ public class SingleQueueDistributionStra
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
+        when(queueProvider.getQueue(ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
-        boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+        boolean returnedState = singleQueueDistributionStrategy.add(replicationPackage, queueProvider);
         assertTrue(returnedState);
     }
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java Tue Nov  4 15:44:12 2014
@@ -42,9 +42,9 @@ public class JobHandlingReplicationQueue
         JobManager jobManager = mock(JobManager.class);
 
         BundleContext context = mock(BundleContext.class);
-        JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
+        JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider("dummy-agent",
                 jobManager, context);
-        ReplicationQueue queue = jobHandlingReplicationQueueProvider.getInternalQueue("dummy-agent", "default");
+        ReplicationQueue queue = jobHandlingReplicationQueueProvider.getQueue("default");
         assertNotNull(queue);
     }
 
@@ -57,11 +57,11 @@ public class JobHandlingReplicationQueue
         Configuration config = mock(Configuration.class);
         when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
         BundleContext context = mock(BundleContext.class);
-        JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
+        JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider("dummy-agent",
                 jobManager, context);
         String agentName = "dummy-agent";
         ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
-        jobHandlingReplicationQueueProvider.enableQueueProcessing(agentName, queueProcessor);
+        jobHandlingReplicationQueueProvider.enableQueueProcessing(queueProcessor);
     }
 
     @Test
@@ -71,9 +71,9 @@ public class JobHandlingReplicationQueue
         Configuration config = mock(Configuration.class);
         when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
         BundleContext context = mock(BundleContext.class);
-        JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
-                jobManager,  context);
+        JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider("dummy-agent",
+                jobManager, context);
         String agentName = "dummy-agent";
-        jobHandlingReplicationQueueProvider.disableQueueProcessing(agentName);
+        jobHandlingReplicationQueueProvider.disableQueueProcessing();
     }
 }
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java?rev=1636613&r1=1636612&r2=1636613&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java Tue Nov  4 15:44:12 2014
@@ -51,7 +51,7 @@ public class ScheduledReplicationQueuePr
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queue.isEmpty()).thenReturn(true);
         queues.add(queue);
-        when(queueProvider.getAllQueues()).thenReturn(queues);
+        when(queueProvider.getQueues()).thenReturn(queues);
         ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
         ScheduledReplicationQueueProcessorTask scheduledReplicationQueueProcessorTask = new ScheduledReplicationQueueProcessorTask(
                 queueProvider, queueProcessor);
@@ -68,7 +68,7 @@ public class ScheduledReplicationQueuePr
         when(queue.getHead()).thenReturn(item);
 
         queues.add(queue);
-        when(queueProvider.getAllQueues()).thenReturn(queues);
+        when(queueProvider.getQueues()).thenReturn(queues);
         ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
         ScheduledReplicationQueueProcessorTask scheduledReplicationQueueProcessorTask = new ScheduledReplicationQueueProcessorTask(
                 queueProvider, queueProcessor);