You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2014/11/04 10:34:35 UTC

svn commit: r1636529 - in /sling/trunk/contrib/extensions/replication/core/src: main/java/org/apache/sling/replication/agent/ main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/monitor/ main/java/org/apache/sling/...

Author: mpetria
Date: Tue Nov  4 09:34:34 2014
New Revision: 1636529

URL: http://svn.apache.org/r1636529
Log:
SLING-4106: changing the queue API not to be aware of all queues

Added:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java
Modified:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java Tue Nov  4 09:34:34 2014
@@ -39,6 +39,13 @@ import org.apache.sling.replication.queu
 @ProviderType
 public interface ReplicationAgent extends ReplicationComponent {
 
+
+    /**
+     * retrieves the names of the queues for this agent.
+     * @return the list of queue names
+     */
+    Iterable<String> getQueueNames();
+
     /**
      * get the agent queue with the given name
      *

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Tue Nov  4 09:34:34 2014
@@ -40,10 +40,7 @@ import org.apache.sling.replication.comm
 import org.apache.sling.replication.component.ManagedReplicationComponent;
 import org.apache.sling.replication.event.impl.ReplicationEventFactory;
 import org.apache.sling.replication.event.ReplicationEventType;
-import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.packaging.ReplicationPackageExporter;
-import org.apache.sling.replication.packaging.ReplicationPackageImportException;
-import org.apache.sling.replication.packaging.ReplicationPackageImporter;
+import org.apache.sling.replication.packaging.*;
 import org.apache.sling.replication.queue.*;
 import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
 import org.apache.sling.replication.trigger.ReplicationRequestHandler;
@@ -192,13 +189,17 @@ public class SimpleReplicationAgent impl
         return replicationResponse;
     }
 
+    public Iterable<String> getQueueNames() {
+        return queueDistributionStrategy.getQueueNames();
+    }
+
     public ReplicationQueue getQueue(String queueName) throws ReplicationAgentException {
         ReplicationQueue queue;
         try {
             if (queueName != null && queueName.length() > 0) {
                 queue = queueProvider.getQueue(this.name, queueName);
             } else {
-                queue = queueProvider.getDefaultQueue(this.name);
+                queue = queueProvider.getQueue(this.name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME);
             }
         } catch (ReplicationQueueException e) {
             throw new ReplicationAgentException(e);
@@ -244,7 +245,7 @@ public class SimpleReplicationAgent impl
         }
     }
 
-    private boolean processQueue(ReplicationQueueItem queueItem) {
+    private boolean processQueue(String queueName, ReplicationQueueItem queueItem) {
         boolean success = false;
         log.debug("reading package with id {}", queueItem.getId());
         ResourceResolver agentResourceResolver = null;
@@ -265,7 +266,12 @@ public class SimpleReplicationAgent impl
                 properties.put("replication.agent.name", name);
                 replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_REPLICATED, properties);
 
-                replicationPackage.delete();
+                if (replicationPackage instanceof SharedReplicationPackage) {
+                    ((SharedReplicationPackage) replicationPackage).release(queueName);
+                }
+                else {
+                    replicationPackage.delete();
+                }
                 success = true;
             } else {
                 log.warn("replication package with id {} does not exist", queueItem.getId());
@@ -308,7 +314,7 @@ public class SimpleReplicationAgent impl
     class PackageQueueProcessor implements ReplicationQueueProcessor {
         public boolean process(@Nonnull String queueName, @Nonnull ReplicationQueueItem packageInfo) {
             log.info("running package queue processor for queue {}", queueName);
-            return processQueue(packageInfo);
+            return processQueue(queueName, packageInfo);
         }
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java Tue Nov  4 09:34:34 2014
@@ -18,10 +18,8 @@
  */
 package org.apache.sling.replication.monitor;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -38,6 +36,7 @@ import org.apache.sling.commons.osgi.Pro
 import org.apache.sling.hc.api.HealthCheck;
 import org.apache.sling.hc.api.Result;
 import org.apache.sling.hc.util.FormattingResultLog;
+import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -58,12 +57,10 @@ import org.slf4j.LoggerFactory;
         @Property(name = HealthCheck.MBEAN_NAME, value = "slingReplicationQueue", description = "Health Check MBean name", label = "MBean name")
 })
 @References({
-        @Reference(name = "replicationQueueProvider",
-                referenceInterface = ReplicationQueueProvider.class,
+        @Reference(name = "replicationAgent",
+                referenceInterface = ReplicationAgent.class,
                 cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
-                policy = ReferencePolicy.DYNAMIC,
-                bind = "bindReplicationQueueProvider",
-                unbind = "unbindReplicationQueueProvider")
+                policy = ReferencePolicy.DYNAMIC)
 })
 
 @Service(value = HealthCheck.class)
@@ -78,7 +75,7 @@ public class ReplicationQueueHealthCheck
     @Property(intValue = DEFAULT_NUMBER_OF_RETRIES_ALLOWED, description = "Number of allowed retries", label = "Allowed retries")
     private static final String NUMBER_OF_RETRIES_ALLOWED = "numberOfRetriesAllowed";
 
-    private final Collection<ReplicationQueueProvider> replicationQueueProviders = new LinkedList<ReplicationQueueProvider>();
+    private final List<ReplicationAgent> replicationAgents = new CopyOnWriteArrayList<ReplicationAgent>();
 
     @Activate
     public void activate(final Map<String, Object> properties) {
@@ -88,31 +85,30 @@ public class ReplicationQueueHealthCheck
 
     @Deactivate
     protected void deactivate() {
-        replicationQueueProviders.clear();
+        replicationAgents.clear();
     }
 
-    protected void bindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
-        synchronized (replicationQueueProviders) {
-            replicationQueueProviders.add(replicationQueueProvider);
-        }
-        log.debug("Registering replication queue provider {} ", replicationQueueProvider);
+    protected void bindReplicationAgent(final ReplicationAgent replicationAgent) {
+        replicationAgents.add(replicationAgent);
+
+        log.debug("Registering replication agent {} ", replicationAgent);
     }
 
-    protected void unbindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
-        synchronized (replicationQueueProviders) {
-            replicationQueueProviders.remove(replicationQueueProvider);
-        }
-        log.debug("Unregistering replication queue provider {} ", replicationQueueProvider);
+    protected void unbindReplicationAgent(final ReplicationAgent replicationAgent) {
+        replicationAgents.remove(replicationAgent);
+        log.debug("Unregistering replication agent {} ", replicationAgent);
     }
 
     public Result execute() {
         final FormattingResultLog resultLog = new FormattingResultLog();
         Map<String, Integer> failures = new HashMap<String, Integer>();
-        if (replicationQueueProviders.size() > 0) {
+        if (replicationAgents.size() > 0) {
 
-            for (ReplicationQueueProvider replicationQueueProvider : replicationQueueProviders) {
-                for (ReplicationQueue q : replicationQueueProvider.getAllQueues())
+            for (ReplicationAgent replicationAgent : replicationAgents) {
+                for (String queueName : replicationAgent.getQueueNames()) {
                     try {
+                        ReplicationQueue q = replicationAgent.getQueue(queueName);
+
                         ReplicationQueueItem item = q.getHead();
                         if (item != null) {
                             ReplicationQueueItemState status = q.getStatus(item);
@@ -129,8 +125,9 @@ public class ReplicationQueueHealthCheck
                         }
 
                     } catch (Exception e) {
-                        resultLog.warn("Exception while inspecting replication queue [{}]: {}", q.getName(), e);
+                        resultLog.warn("Exception while inspecting replication queue [{}]: {}", queueName, e);
                     }
+                }
             }
         } else {
             resultLog.debug("No replication queue providers found");

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java Tue Nov  4 09:34:34 2014
@@ -30,12 +30,12 @@ public interface SharedReplicationPackag
     /**
      * acquire a reference to this package and increase the reference count.
      */
-    void acquire();
+    void acquire(String holderName);
 
     /**
      * release a reference to this package and decrease the reference count.
      * when no more references are hold the package <code>delete</code> method is called.
      */
-    void release();
+    void release(String holderName);
 
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java Tue Nov  4 09:34:34 2014
@@ -22,6 +22,7 @@ import aQute.bnd.annotation.ConsumerType
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.util.Collection;
 
 /**
@@ -76,10 +77,12 @@ public interface ReplicationQueue {
     /**
      * get the items in the queue
      *
-     * @return a <code>Collection</code> of {@link org.apache.sling.replication.queue.ReplicationQueueItem}s
+     * @param queueItemSelector represents the criteria to filter queue items.
+     *                          if null is passed then all items are returned.
+     * @return a <code>Iterable</code> of {@link org.apache.sling.replication.queue.ReplicationQueueItem}s
      */
     @Nonnull
-    Collection<ReplicationQueueItem> getItems();
+    Iterable<ReplicationQueueItem> getItems(@Nullable ReplicationQueueItemSelector queueItemSelector);
 
     /**
      * remove an item from the queue by specifying its id

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Tue Nov  4 09:34:34 2014
@@ -33,7 +33,7 @@ import java.util.List;
  */
 @ConsumerType
 public interface ReplicationQueueDistributionStrategy extends ReplicationComponent {
-    String DEFAULT_QUEUE_NAME = "";
+    String DEFAULT_QUEUE_NAME = "default";
 
     /**
      * synchronously distribute a {@link org.apache.sling.replication.packaging.ReplicationPackage}

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java?rev=1636529&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java Tue Nov  4 09:34:34 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.replication.queue;
+
+/**
+ * Class representing criteria for queue items selection.
+ */
+public class ReplicationQueueItemSelector {
+    private final int skip;
+    private final int limit;
+
+    /**
+     *
+     * @param skip the number of items to skip
+     * @param limit the maximum number of items to return. use -1 to return all items.
+     */
+    public ReplicationQueueItemSelector(int skip, int limit) {
+        this.skip = skip;
+        this.limit = limit;
+    }
+
+    /**
+     * @return the number of items to skip from the queue.
+     */
+    public int getSkip() {
+        return skip;
+    }
+
+    /**
+     *
+     * @return return the maximum number of items to be selected.
+     */
+    public int getLimit() {
+        return limit;
+    }
+}

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java Tue Nov  4 09:34:34 2014
@@ -26,36 +26,35 @@ import java.util.Calendar;
 
 public class ReplicationQueueItemState {
 
-    private int attempts;
+    private final int attempts;
 
-    private ItemState state;
+    private final ItemState state;
 
-    private Calendar entered;
+    private final Calendar entered;
 
-    public boolean isSuccessful() {
-        return ItemState.SUCCEEDED.equals(state);
+    public ReplicationQueueItemState(Calendar entered, ItemState state, int attempts) {
+
+        this.entered = entered;
+        this.state = state;
+        this.attempts = attempts;
     }
 
-    public void setSuccessful(boolean successful) {
-        state = successful ? ItemState.SUCCEEDED : ItemState.ERROR;
+    public ReplicationQueueItemState(ItemState state) {
+        this(Calendar.getInstance(), state, 0);
     }
 
-    public int getAttempts() {
-        return attempts;
+    public boolean isSuccessful() {
+        return ItemState.SUCCEEDED.equals(state);
     }
 
-    public void setAttempts(int attempts) {
-        this.attempts = attempts;
+    public int getAttempts() {
+        return attempts;
     }
 
     public ItemState getItemState() {
         return state;
     }
 
-    public void setItemState(ItemState status) {
-        this.state = status;
-    }
-
     @Override
     public String toString() {
         return "{\"attempts\":\"" + attempts + "\",\"" + "successful\":\"" + isSuccessful() + "\",\"" + "state\":\"" + state + "\"}";
@@ -65,10 +64,6 @@ public class ReplicationQueueItemState {
         return entered;
     }
 
-    public void setEntered(Calendar entered) {
-        this.entered = entered;
-    }
-
     public enum ItemState {
         QUEUED, // waiting in queue after adding or for restart after failing
         ACTIVE, // job is currently in processing

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Tue Nov  4 09:34:34 2014
@@ -43,24 +43,6 @@ public interface ReplicationQueueProvide
             throws ReplicationQueueException;
 
 
-    /**
-     * get the default queue to be used for a certain agent
-     *
-     * @param agentName a replication agent
-     * @return the default replication queue for the given agent
-     * @throws ReplicationQueueException
-     */
-    @Nonnull
-    ReplicationQueue getDefaultQueue(@Nonnull String agentName)
-            throws ReplicationQueueException;
-
-    /**
-     * get all the available queues from this provider
-     *
-     * @return a collection of replication queues
-     */
-    @Nonnull
-    Collection<ReplicationQueue> getAllQueues();
 
     /**
      * enables queue driven processing for an agent

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Tue Nov  4 09:34:34 2014
@@ -54,12 +54,6 @@ public abstract class AbstractReplicatio
     }
 
     @Nonnull
-    public ReplicationQueue getDefaultQueue(@Nonnull String agentName)
-            throws ReplicationQueueException {
-        return getQueue(agentName, "");
-    }
-
-    @Nonnull
     public Collection<ReplicationQueue> getAllQueues() {
         return queueMap.values();
     }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Tue Nov  4 09:34:34 2014
@@ -88,7 +88,7 @@ public class ErrorAwareQueueDistribution
                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         boolean added;
         ReplicationQueueItem queueItem = getItem(replicationPackage);
-        ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
+        ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
         added = queue.add(queueItem);
         checkAndRemoveStuckItems(agentName, queueProvider);
         return added;
@@ -100,7 +100,7 @@ public class ErrorAwareQueueDistribution
 
     private void checkAndRemoveStuckItems(String agent,
                                           ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
-        ReplicationQueue defaultQueue = queueProvider.getDefaultQueue(agent);
+        ReplicationQueue defaultQueue = queueProvider.getQueue(agent, DEFAULT_QUEUE_NAME);
         // get first item in the queue with its status
         ReplicationQueueItem firstItem = defaultQueue.getHead();
         if (firstItem != null) {

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Tue Nov  4 09:34:34 2014
@@ -89,7 +89,7 @@ public class PriorityPathDistributionStr
             queue = queueProvider.getQueue(agentName, pp);
         } else {
             log.info("using default queue");
-            queue = queueProvider.getDefaultQueue(agentName);
+            queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
         }
         return queue;
     }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Tue Nov  4 09:34:34 2014
@@ -54,7 +54,7 @@ public class SingleQueueDistributionStra
     public boolean add(String agentName, ReplicationPackage replicationPackage,
                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         ReplicationQueueItem queueItem = getItem(replicationPackage);
-        ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
+        ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
         return queue.add(queueItem);
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Tue Nov  4 09:34:34 2014
@@ -27,10 +27,7 @@ import java.util.Map;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.JobManager.QueryType;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.*;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,22 +75,25 @@ public class JobHandlingReplicationQueue
     @Nonnull
     public ReplicationQueueItemState getStatus(@Nonnull ReplicationQueueItem replicationPackage)
             throws ReplicationQueueException {
-        ReplicationQueueItemState itemStatus = new ReplicationQueueItemState();
         try {
             Map<String, Object> properties = JobHandlingUtils.createIdProperties(replicationPackage.getId());
             Job job = jobManager.getJob(topic, properties);
             if (job != null) {
-                itemStatus.setAttempts(job.getRetryCount());
-                itemStatus.setItemState(ItemState.valueOf(job.getJobState().toString()));
-                itemStatus.setEntered(job.getCreated());
+
+                ReplicationQueueItemState itemState = new ReplicationQueueItemState(job.getCreated(),
+                        ItemState.valueOf(job.getJobState().toString()),
+                        job.getRetryCount());
+
                 log.info("status of job {} is {}", job.getId(), job.getJobState());
+
+                return itemState;
             } else {
-                itemStatus.setItemState(ItemState.DROPPED);
+                ReplicationQueueItemState itemState = new ReplicationQueueItemState(ItemState.DROPPED);
+                return itemState;
             }
         } catch (Exception e) {
             throw new ReplicationQueueException("unable to retrieve the queue status", e);
         }
-        return itemStatus;
     }
 
     public ReplicationQueueItem getHead() {
@@ -108,7 +108,7 @@ public class JobHandlingReplicationQueue
     private Job getFirstJob() {
         log.info("getting first item in the queue");
 
-        Collection<Job> jobs = getJobs(1);
+        List<Job> jobs = getJobs(0, 1);
         if (jobs.size() > 0) {
             Job firstItem = jobs.toArray(new Job[jobs.size()])[0];
             log.info("first item in the queue is {}, retried {} times", firstItem.getId(), firstItem.getRetryCount());
@@ -128,18 +128,37 @@ public class JobHandlingReplicationQueue
         return job;
     }
 
-    private Collection<Job> getJobs(int limit) {
-        return jobManager.findJobs(QueryType.ALL, topic, limit);
+    private List<Job> getJobs(int skip, int limit) {
+        int actualSkip = skip < 0 ? 0 : skip;
+        int actualLimit = limit < 0 ? -1 : actualSkip + limit;
+
+
+        Collection<Job> jobs = jobManager.findJobs(QueryType.ALL, topic, actualLimit);
+        List<Job> result = new ArrayList<Job>();
+
+        int i =0;
+        for (Job job : jobs) {
+            if (i >= actualSkip) {
+                result.add(job);
+            }
+            i++;
+        }
+
+        return result;
     }
 
     public boolean isEmpty() {
-        return getItems().isEmpty();
+        return getJobs(0, -1).isEmpty();
     }
 
     @Nonnull
-    public List<ReplicationQueueItem> getItems() {
+    public List<ReplicationQueueItem> getItems(ReplicationQueueItemSelector selector) {
+        if (selector == null) {
+            selector = new ReplicationQueueItemSelector(0, -1);
+        }
+
         List<ReplicationQueueItem> items = new ArrayList<ReplicationQueueItem>();
-        Collection<Job> jobs = getJobs(-1);
+        Collection<Job> jobs = getJobs(selector.getSkip(), selector.getLimit());
         for (Job job : jobs) {
             items.add(JobHandlingUtils.getPackage(job));
         }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java Tue Nov  4 09:34:34 2014
@@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory;
 public class ScheduledReplicationQueueProcessorTask implements Runnable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final ReplicationQueueProvider queueProvider;
+    private final SimpleReplicationQueueProvider queueProvider;
     private final ReplicationQueueProcessor queueProcessor;
 
-    public ScheduledReplicationQueueProcessorTask(ReplicationQueueProvider queueProvider,
+    public ScheduledReplicationQueueProcessorTask(SimpleReplicationQueueProvider queueProvider,
                                                   ReplicationQueueProcessor queueProcessor) {
         this.queueProvider = queueProvider;
         this.queueProcessor = queueProcessor;

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Tue Nov  4 09:34:34 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueItem;
+import org.apache.sling.replication.queue.ReplicationQueueItemSelector;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
 import org.slf4j.Logger;
@@ -63,54 +64,52 @@ public class SimpleReplicationQueue impl
     }
 
     public boolean add(@Nonnull ReplicationQueueItem item) {
-        ReplicationQueueItemState status = new ReplicationQueueItemState();
+        ItemState itemState = ItemState.ERROR;
         boolean result = false;
         try {
             result = queue.offer(item, 10, TimeUnit.SECONDS);
-            status.setEntered(Calendar.getInstance());
+            itemState = ItemState.QUEUED;
         } catch (InterruptedException e) {
             log.error("cannot add an item to the queue", e);
-            status.setSuccessful(false);
         } finally {
-            statusMap.put(item, status);
+            statusMap.put(item, new ReplicationQueueItemState(Calendar.getInstance(), itemState, 0));
         }
         return result;
     }
 
     @Nonnull
-    public ReplicationQueueItemState getStatus(@Nonnull ReplicationQueueItem replicationPackage) {
-        ReplicationQueueItemState status = statusMap.get(replicationPackage);
-        if (queue.contains(replicationPackage)) {
-            status.setItemState(ItemState.QUEUED);
+    public ReplicationQueueItemState getStatus(@Nonnull ReplicationQueueItem queueItem) {
+        ReplicationQueueItemState itemStatus = statusMap.get(queueItem);
+
+        if (queue.contains(queueItem)) {
+            return itemStatus;
         } else {
-            status.setItemState(ItemState.SUCCEEDED);
+            return new ReplicationQueueItemState(itemStatus.getEntered(), ItemState.SUCCEEDED, itemStatus.getAttempts());
         }
-        return status;
     }
 
     public ReplicationQueueItem getHead() {
         ReplicationQueueItem element = queue.peek();
         if (element != null) {
-            ReplicationQueueItemState replicationQueueItemStatus = statusMap.get(element);
-            replicationQueueItemStatus.setAttempts(replicationQueueItemStatus.getAttempts() + 1);
+            ReplicationQueueItemState itemState = statusMap.get(element);
+            statusMap.put(element, new ReplicationQueueItemState(itemState.getEntered(),
+                    itemState.getItemState(),
+                    itemState.getAttempts()+1));
         }
         return element;
     }
 
-    public void removeHead() {
-        ReplicationQueueItem element = queue.remove();
-        statusMap.get(element).setSuccessful(true);
-    }
-
     public boolean isEmpty() {
         return queue.isEmpty();
     }
 
     @Nonnull
-    public Collection<ReplicationQueueItem> getItems() {
+    public Iterable<ReplicationQueueItem> getItems(ReplicationQueueItemSelector queueItemSelector) {
         return queue;
     }
 
+
+
     public void remove(@Nonnull String id) {
         ReplicationQueueItem toRemove = null;
         for (ReplicationQueueItem item : queue) {

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java Tue Nov  4 09:34:34 2014
@@ -20,10 +20,7 @@ package org.apache.sling.replication.ser
 
 import javax.annotation.Nonnull;
 
-import org.apache.sling.api.resource.ModifiableValueMap;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.*;
 import org.apache.sling.replication.packaging.ReplicationPackage;
 import org.apache.sling.replication.packaging.ReplicationPackageInfo;
 import org.apache.sling.replication.packaging.SharedReplicationPackage;
@@ -32,14 +29,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 
 public class ResourceSharedReplicationPackage implements SharedReplicationPackage {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private static final Object syncObject = new Object();
-
 
+    protected static final String  REFERENCE_ROOT_NODE = "refs";
     private static final String  PN_REFERENCE_COUNT = "ref.count";
 
+
     private final ResourceResolver resourceResolver;
     private final String packagePath;
     private final ReplicationPackage replicationPackage;
@@ -50,41 +48,33 @@ public class ResourceSharedReplicationPa
         this.replicationPackage = replicationPackage;
     }
 
-    public void acquire() {
-        synchronized (syncObject) {
-            Resource resource = getProxyResource();
-            ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
-            int refCount = valueMap.get(PN_REFERENCE_COUNT, 0);
-            refCount ++;
-            valueMap.put(PN_REFERENCE_COUNT, refCount);
-
-            try {
-                resourceResolver.commit();
-            } catch (PersistenceException e) {
-                log.error("cannot release package", e);
-            }
+    public void acquire(String holderName) {
+        if (holderName == null || holderName.length() == 0) {
+            throw new IllegalArgumentException("holder name cannot be null or empty");
+        }
+        
+        try {
+            createHolderResource(holderName);
+        } catch (PersistenceException e) {
+            log.error("cannot acquire package", e);
         }
     }
 
-    public void release() {
-        synchronized (syncObject) {
-            Resource resource = getProxyResource();
-            ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
-            int refCount = valueMap.get(PN_REFERENCE_COUNT, 0);
-            refCount --;
+    public void release(String holderName) {
 
-            if (refCount > 0) {
-                valueMap.put(PN_REFERENCE_COUNT, refCount);
-            }
-            else {
-                delete();
-            }
+        if (holderName == null || holderName.length() == 0) {
+            throw new IllegalArgumentException("holder name cannot be null or empty");
+        }
 
-            try {
-                resourceResolver.commit();
-            } catch (PersistenceException e) {
-                log.error("cannot release package", e);
+        try {
+            deleteHolderResource(holderName);
+
+            Resource holderRoot = getHolderRootResource();
+            if (!holderRoot.hasChildren()) {
+                delete();
             }
+        } catch (PersistenceException e) {
+            log.error("cannot release package", e);
         }
     }
 
@@ -141,8 +131,60 @@ public class ResourceSharedReplicationPa
 
 
     private Resource getProxyResource() {
-        Resource resource = resourceResolver.getResource(packagePath);
+        String holderPath = packagePath;
+
+        resourceResolver.refresh();
+        Resource resource = resourceResolver.getResource(holderPath);
         return resource;
     }
 
+
+
+
+    private Resource getHolderRootResource()  {
+        Resource resource = getProxyResource();
+
+        Resource holderRoot = resource.getChild(REFERENCE_ROOT_NODE);
+        if (holderRoot != null) {
+            return holderRoot;
+        }
+
+        return null;
+    }
+
+    private void createHolderResource(String holderName) throws PersistenceException {
+        Resource holderRoot = getHolderRootResource();
+
+        if (holderRoot == null) {
+            return;
+        }
+
+        Resource holder = holderRoot.getChild(holderName);
+
+        if (holder != null) {
+            return;
+        }
+
+        resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "nt:unstructured"));
+        resourceResolver.commit();
+
+    }
+
+    private void deleteHolderResource(String holderName) throws PersistenceException {
+        Resource holderRoot = getHolderRootResource();
+
+        if (holderRoot == null) {
+            return;
+        }
+
+        Resource holder = holderRoot.getChild(holderName);
+
+        if (holder == null) {
+            return;
+        }
+
+        resourceResolver.delete(holder);
+        resourceResolver.commit();
+    }
+
 }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java Tue Nov  4 09:34:34 2014
@@ -84,6 +84,11 @@ public class ResourceSharedReplicationPa
 
     public ReplicationPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String replicationPackageId) {
         String originalPackageId = retrieveIdFromPath(resourceResolver, replicationPackageId);
+
+        if (originalPackageId == null) {
+            return null;
+        }
+
         ReplicationPackage replicationPackage = replicationPackageBuilder.getPackage(resourceResolver, originalPackageId);
 
         if (replicationPackage == null) {
@@ -115,21 +120,35 @@ public class ResourceSharedReplicationPa
         properties.put(PN_ORIGINAL_PATHS, replicationPackage.getPaths());
 
         Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath, "nt:unstructured", "sling:Folder", false);
+
         ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
         valueMap.putAll(properties);
 
+        resourceResolver.create(resource, ResourceSharedReplicationPackage.REFERENCE_ROOT_NODE,
+                Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)"nt:unstructured"));
+
         resourceResolver.commit();
         return packagePath;
 
     }
 
     private String retrieveIdFromPath(ResourceResolver resourceResolver, String packagePath) {
-        if (!packagePath.startsWith(sharedPackagesRoot)) return null;
+        if (!packagePath.startsWith(sharedPackagesRoot)) {
+            return null;
+        }
 
         Resource resource = resourceResolver.getResource(packagePath);
 
+        if (resource == null) {
+            return null;
+        }
+
         ValueMap properties = resource.adaptTo(ValueMap.class);
 
+        if (properties == null) {
+            return null;
+        }
+
 
         return properties.get(PN_ORIGINAL_ID, null);
     }

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java Tue Nov  4 09:34:34 2014
@@ -90,16 +90,14 @@ public class ReplicationAgentQueueServle
         StringBuilder builder = new StringBuilder("{\"name\":\"" + queue.getName() + "\",\"empty\":" + queue.isEmpty());
         if (!queue.isEmpty()) {
             builder.append(",\"items\":[");
-            for (ReplicationQueueItem item : queue.getItems()) {
+            for (ReplicationQueueItem item : queue.getItems(null)) {
                 builder.append('{');
                 builder.append(toJSoN(item));
                 builder.append(',');
                 builder.append(toJSoN(queue.getStatus(item)));
                 builder.append("},");
             }
-            if (queue.getItems().size() > 0) {
-                builder.deleteCharAt(builder.length() - 1);
-            }
+            builder.deleteCharAt(builder.length() - 1);
             builder.append(']');
         }
         builder.append('}');

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Tue Nov  4 09:34:34 2014
@@ -76,7 +76,7 @@ public class SimpleReplicationAgentTest 
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
         when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
                 .thenReturn(Arrays.asList(replicationPackage));
-        when(queueProvider.getDefaultQueue(name)).thenReturn(
+        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
                 new SimpleReplicationQueue(name, "name"));
         ReplicationResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
@@ -107,7 +107,7 @@ public class SimpleReplicationAgentTest 
         when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
         when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
                 .thenReturn(Arrays.asList(replicationPackage));
-        when(queueProvider.getDefaultQueue(name)).thenReturn(
+        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
                 new SimpleReplicationQueue(name, "name"));
         ReplicationResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
@@ -137,7 +137,7 @@ public class SimpleReplicationAgentTest 
 
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
         when(packageExporter.exportPackages(resourceResolver, request)).thenReturn(Arrays.asList(replicationPackage));
-        when(queueProvider.getDefaultQueue(name)).thenReturn(
+        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
                 new SimpleReplicationQueue(name, "name"));
 
         agent.execute(resourceResolver, request);
@@ -161,7 +161,8 @@ public class SimpleReplicationAgentTest 
                 queueProvider, distributionHandler,
                 replicationEventFactory, resolverFactory, null);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue(name)).thenReturn(queue);
+        when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME))
+                .thenReturn(queue);
         assertNotNull(agent.getQueue(null));
     }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java Tue Nov  4 09:34:34 2014
@@ -18,11 +18,10 @@
  */
 package org.apache.sling.replication.monitor;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
+import java.util.*;
 
 import org.apache.sling.hc.api.Result;
+import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -32,7 +31,9 @@ import org.junit.Test;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.when;
 
 /**
@@ -56,11 +57,12 @@ public class ReplicationQueueHealthCheck
         replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queue.getHead()).thenReturn(null);
-        ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
-        Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
-        providers.add(queue);
-        when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
-        replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+        ReplicationAgent replicationAgent = mock(ReplicationAgent.class);
+
+        List<String> queues = new ArrayList<String>();
+        queues.add("queueName");
+        when(replicationAgent.getQueueNames()).thenReturn(queues);        when(replicationAgent.getQueue(anyString())).thenReturn(queue);
+        replicationQueueHealthCheck.bindReplicationAgent(replicationAgent);
 
         Result result = replicationQueueHealthCheck.execute();
         assertNotNull(result);
@@ -78,11 +80,14 @@ public class ReplicationQueueHealthCheck
         when(status.getAttempts()).thenReturn(1);
         when(queue.getStatus(item)).thenReturn(status);
         when(queue.getHead()).thenReturn(item);
-        ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
-        Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
-        providers.add(queue);
-        when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
-        replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+        ReplicationAgent replicationAgent = mock(ReplicationAgent.class);
+
+        List<String> queues = new ArrayList<String>();
+        queues.add("queueName");
+        when(replicationAgent.getQueueNames()).thenReturn(queues);
+        when(replicationAgent.getQueue(anyString())).thenReturn(queue);
+        replicationQueueHealthCheck.bindReplicationAgent(replicationAgent);
+
 
         Result result = replicationQueueHealthCheck.execute();
         assertNotNull(result);
@@ -100,11 +105,13 @@ public class ReplicationQueueHealthCheck
         when(status.getAttempts()).thenReturn(10);
         when(queue.getStatus(item)).thenReturn(status);
         when(queue.getHead()).thenReturn(item);
-        ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
-        Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
-        providers.add(queue);
-        when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
-        replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+        ReplicationAgent replicationAgent = mock(ReplicationAgent.class);
+
+        List<String> queues = new ArrayList<String>();
+        queues.add("queueName");
+        when(replicationAgent.getQueueNames()).thenReturn(queues);
+        when(replicationAgent.getQueue(anyString())).thenReturn(queue);
+        replicationQueueHealthCheck.bindReplicationAgent(replicationAgent);
 
         Result result = replicationQueueHealthCheck.execute();
         assertNotNull(result);

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java Tue Nov  4 09:34:34 2014
@@ -21,10 +21,7 @@ package org.apache.sling.replication.que
 import java.util.Dictionary;
 
 import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
 import org.junit.Test;
 import org.osgi.service.component.ComponentContext;
 
@@ -47,7 +44,7 @@ public class ErrorAwareQueueDistribution
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
         boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -63,7 +60,7 @@ public class ErrorAwareQueueDistribution
         ReplicationQueue queue = mock(ReplicationQueue.class);
         ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
 
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(queueItem)).thenReturn(true);
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
         when(state.isSuccessful()).thenReturn(false);
@@ -86,7 +83,7 @@ public class ErrorAwareQueueDistribution
         ReplicationQueue queue = mock(ReplicationQueue.class);
         ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
 
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(queueItem)).thenReturn(true);
         when(queue.getHead()).thenReturn(queueItem);
         ReplicationQueue errorQueue = mock(ReplicationQueue.class);
@@ -113,7 +110,7 @@ public class ErrorAwareQueueDistribution
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
         when(queue.getHead()).thenReturn(mock(ReplicationQueueItem.class));
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
@@ -131,7 +128,7 @@ public class ErrorAwareQueueDistribution
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
         boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
         assertTrue(returnedState);
@@ -143,7 +140,7 @@ public class ErrorAwareQueueDistribution
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
         boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java Tue Nov  4 09:34:34 2014
@@ -21,10 +21,7 @@ package org.apache.sling.replication.que
 import java.util.Dictionary;
 
 import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
 import org.junit.Test;
 import org.osgi.service.component.ComponentContext;
 
@@ -53,7 +50,7 @@ public class PriorityPathQueueDistributi
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
         boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -91,7 +88,7 @@ public class PriorityPathQueueDistributi
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
         boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -131,7 +128,7 @@ public class PriorityPathQueueDistributi
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
 
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
         boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
         assertTrue(returnedState);
@@ -168,7 +165,7 @@ public class PriorityPathQueueDistributi
         when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
         boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
         assertTrue(returnedState);

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java Tue Nov  4 09:34:34 2014
@@ -19,10 +19,7 @@
 package org.apache.sling.replication.queue.impl;
 
 import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
 import org.junit.Test;
 
 import static org.junit.Assert.assertFalse;
@@ -44,7 +41,7 @@ public class SingleQueueDistributionStra
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
         boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -58,7 +55,7 @@ public class SingleQueueDistributionStra
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(queueItem)).thenReturn(true);
         ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
         when(state.isSuccessful()).thenReturn(false);
@@ -73,7 +70,7 @@ public class SingleQueueDistributionStra
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
         boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
         assertTrue(returnedState);
@@ -85,7 +82,7 @@ public class SingleQueueDistributionStra
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         ReplicationQueue queue = mock(ReplicationQueue.class);
-        when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+        when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
         when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
 
         boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);

Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java Tue Nov  4 09:34:34 2014
@@ -37,7 +37,7 @@ public class ScheduledReplicationQueuePr
 
     @Test
     public void testRunWithNoQueue() throws Exception {
-        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+        SimpleReplicationQueueProvider queueProvider = mock(SimpleReplicationQueueProvider.class);
         ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
         ScheduledReplicationQueueProcessorTask scheduledReplicationQueueProcessorTask = new ScheduledReplicationQueueProcessorTask(
                 queueProvider, queueProcessor);
@@ -46,7 +46,7 @@ public class ScheduledReplicationQueuePr
 
     @Test
     public void testRunWithOneEmptyQueue() throws Exception {
-        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+        SimpleReplicationQueueProvider queueProvider = mock(SimpleReplicationQueueProvider.class);
         Collection<ReplicationQueue> queues = new LinkedList<ReplicationQueue>();
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queue.isEmpty()).thenReturn(true);
@@ -60,7 +60,7 @@ public class ScheduledReplicationQueuePr
 
     @Test
     public void testRunWithOneNonEmptyQueue() throws Exception {
-        ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+        SimpleReplicationQueueProvider queueProvider = mock(SimpleReplicationQueueProvider.class);
         Collection<ReplicationQueue> queues = new LinkedList<ReplicationQueue>();
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queue.isEmpty()).thenReturn(false).thenReturn(true);