You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2014/10/31 09:50:11 UTC
svn commit: r1635725 - in /sling/trunk/contrib/extensions/replication:
core/src/main/java/org/apache/sling/replication/agent/impl/
core/src/main/java/org/apache/sling/replication/queue/
core/src/main/java/org/apache/sling/replication/queue/impl/ core/s...
Author: mpetria
Date: Fri Oct 31 08:50:10 2014
New Revision: 1635725
URL: http://svn.apache.org/r1635725
Log:
SLING-4120: cleaned ReplicationQueueProvider of quueue creation/deletion semantics
Added:
sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install/org.apache.sling.event.jobs.QueueConfiguration-replication.json
Modified:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Fri Oct 31 08:50:10 2014
@@ -44,13 +44,7 @@ import org.apache.sling.replication.pack
import org.apache.sling.replication.packaging.ReplicationPackageExporter;
import org.apache.sling.replication.packaging.ReplicationPackageImportException;
import org.apache.sling.replication.packaging.ReplicationPackageImporter;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProcessor;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
import org.apache.sling.replication.trigger.ReplicationRequestHandler;
import org.apache.sling.replication.trigger.ReplicationTrigger;
@@ -177,23 +171,19 @@ public class SimpleReplicationAgent impl
ReplicationResponse replicationResponse;
log.info("scheduling replication of package {}", replicationPackage);
- ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
- replicationPackage.getPaths(),
- replicationPackage.getAction(),
- replicationPackage.getType(),
- replicationPackage.getInfo());
+
// dispatch the replication package to the queue distribution handler
try {
- ReplicationQueueItemState state = queueDistributionStrategy.add(name, replicationQueueItem,
- queueProvider);
+ boolean success = queueDistributionStrategy.add(name, replicationPackage, queueProvider);
Dictionary<Object, Object> properties = new Properties();
- properties.put("replication.package.paths", replicationQueueItem.getPaths());
+ properties.put("replication.package.paths", replicationPackage.getPaths());
properties.put("replication.agent.name", name);
replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_QUEUED, properties);
- replicationResponse = new ReplicationResponse(state.getItemState().toString(), state.isSuccessful());
+ replicationResponse = new ReplicationResponse(success? ReplicationQueueItemState.ItemState.QUEUED.toString() :
+ ReplicationQueueItemState.ItemState.ERROR.toString(), success);
} catch (Exception e) {
log.error("an error happened during queue processing", e);
replicationResponse = new ReplicationResponse(e.toString(), false);
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Fri Oct 31 08:50:10 2014
@@ -22,38 +22,36 @@ import javax.annotation.Nonnull;
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.component.ReplicationComponent;
+import org.apache.sling.replication.packaging.ReplicationPackage;
+
+import java.util.List;
/**
* a {@link ReplicationQueueDistributionStrategy} implements an algorithm for the distribution of
- * {@link org.apache.sling.replication.queue.ReplicationQueueItem}s among the available queues for a certain agent
+ * {@link org.apache.sling.replication.packaging.ReplicationPackage}s among the available queues
*/
public interface ReplicationQueueDistributionStrategy extends ReplicationComponent {
+ String DEFAULT_QUEUE_NAME = "";
/**
- * synchronously distribute a {@link org.apache.sling.replication.queue.ReplicationQueueItem} to a {@link ReplicationAgent}
- * to a {@link ReplicationQueue} provided by the given {@link ReplicationQueueProvider}
+ * synchronously distribute a {@link org.apache.sling.replication.packaging.ReplicationPackage}
+ * to one or more {@link ReplicationQueue}s provided by the given {@link ReplicationQueueProvider}
*
* @param agentName the name of a {@link ReplicationAgent}
- * @param item a {@link org.apache.sling.replication.queue.ReplicationQueueItem} to distribute
- * @param queueProvider the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+ * @param replicationPackage a {@link org.apache.sling.replication.packaging.ReplicationPackage} to distribute
+ * @param queueProvider the {@link ReplicationQueueProvider} used to provide the queues to be used for the given package
* @return a {@link ReplicationQueueItemState} representing the state of the package in the queue after its distribution
* @throws ReplicationQueueException if distribution fails
*/
@Nonnull
- ReplicationQueueItemState add(@Nonnull String agentName, @Nonnull ReplicationQueueItem item,
+ boolean add(@Nonnull String agentName, @Nonnull ReplicationPackage replicationPackage,
@Nonnull ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+
/**
- * asynchronously distribute a {@link org.apache.sling.replication.queue.ReplicationQueueItem} to a {@link ReplicationAgent}
- * to a {@link ReplicationQueue} provided by the given {@link ReplicationQueueProvider}
- *
- * @param agentName the name of a {@link ReplicationAgent}
- * @param replicationPackage a {@link org.apache.sling.replication.queue.ReplicationQueueItem} to distribute
- * @param queueProvider the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
- * @return <code>true</code> if the package could be distributed to a {@link ReplicationQueue}, <code>false</code> otherwise
- * @throws ReplicationQueueException
+ * Returns the queue names available for this strategy.
+ * @return a list of queue names
*/
- boolean offer(String agentName, ReplicationQueueItem replicationPackage,
- ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+ List<String> getQueueNames();
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Fri Oct 31 08:50:10 2014
@@ -29,7 +29,7 @@ import org.apache.sling.replication.comp
public interface ReplicationQueueProvider extends ReplicationComponent {
/**
- * provide a named queue for the given agent or creates it if the queue doesn't exist
+ * provide a named queue for the given agent
*
* @param agentName the replication agent needing the queue
* @param queueName the name of the queue to retrieve
@@ -61,14 +61,6 @@ public interface ReplicationQueueProvide
Collection<ReplicationQueue> getAllQueues();
/**
- * removes an existing queue owned by this provider
- *
- * @param queue a replication queue to be removed
- * @throws ReplicationQueueException
- */
- void removeQueue(@Nonnull ReplicationQueue queue) throws ReplicationQueueException;
-
- /**
* enables queue driven processing for an agent
*
* @param agentName a replication agent
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Fri Oct 31 08:50:10 2014
@@ -46,7 +46,7 @@ public abstract class AbstractReplicatio
ReplicationQueue queue = queueMap.get(key);
if (queue == null) {
log.info("creating a queue with key {}", key);
- queue = getOrCreateQueue(agentName, queueName);
+ queue = getInternalQueue(agentName, queueName);
queueMap.put(key, queue);
log.info("queue created {}", queue);
}
@@ -64,18 +64,7 @@ public abstract class AbstractReplicatio
return queueMap.values();
}
- public void removeQueue(@Nonnull ReplicationQueue queue) throws ReplicationQueueException {
- deleteQueue(queue);
- // flush cache
- if (queueMap.containsValue(queue)) {
- if (!queueMap.values().remove(queue)) {
- throw new ReplicationQueueException("could not remove the queue " + queue);
- }
- }
- }
-
- protected abstract ReplicationQueue getOrCreateQueue(String agentName, String queueName) throws ReplicationQueueException;
- protected abstract void deleteQueue(ReplicationQueue queue);
+ protected abstract ReplicationQueue getInternalQueue(String agentName, String queueName) throws ReplicationQueueException;
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Fri Oct 31 08:50:10 2014
@@ -19,7 +19,9 @@
package org.apache.sling.replication.queue.impl;
import javax.annotation.Nonnull;
+import java.util.Arrays;
import java.util.Calendar;
+import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -27,6 +29,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
import org.apache.sling.replication.queue.ReplicationQueueException;
@@ -48,6 +51,8 @@ import org.slf4j.LoggerFactory;
@Property(name = "name", value = ErrorAwareQueueDistributionStrategy.NAME, propertyPrivate = true)
public class ErrorAwareQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
+ protected static final String ERROR_QUEUE_NAME = "error";
+
private static final String ERROR = "ERROR";
public static final String NAME = "error";
@@ -79,38 +84,20 @@ public class ErrorAwareQueueDistribution
timeThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(TIME_THRESHOLD), 600000);
}
- @Nonnull
- public ReplicationQueueItemState add(@Nonnull String agentName, @Nonnull ReplicationQueueItem item,
- @Nonnull ReplicationQueueProvider queueProvider)
- throws ReplicationQueueException {
- try {
- log.debug("using error aware queue distribution");
- ReplicationQueueItemState state = new ReplicationQueueItemState();
- ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
- log.debug("obtained queue {}", queue);
- if (queue.add(item)) {
- log.info("replication status: {}", state);
- state = queue.getStatus(item);
- } else {
- log.error("could not add the item to the queue {}", queue);
- state.setItemState(ItemState.ERROR);
- state.setSuccessful(false);
- }
- return state;
- } finally {
- checkAndRemoveStuckItems(agentName, queueProvider);
- }
- }
-
- public boolean offer(String agentName, ReplicationQueueItem replicationPackage,
+ public boolean add(String agentName, ReplicationPackage replicationPackage,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
boolean added;
+ ReplicationQueueItem queueItem = getItem(replicationPackage);
ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
- added = queue.add(replicationPackage);
+ added = queue.add(queueItem);
checkAndRemoveStuckItems(agentName, queueProvider);
return added;
}
+ public List<String> getQueueNames() {
+ return Arrays.asList(new String[] { ERROR_QUEUE_NAME, DEFAULT_QUEUE_NAME });
+ }
+
private void checkAndRemoveStuckItems(String agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueue defaultQueue = queueProvider.getDefaultQueue(agent);
@@ -126,7 +113,7 @@ public class ErrorAwareQueueDistribution
if (ERROR.equals(stuckQueueHandling)) {
log.warn("item {} moved to the error queue", firstItem);
- ReplicationQueue errorQueue = queueProvider.getQueue(agent, "-error");
+ ReplicationQueue errorQueue = queueProvider.getQueue(agent, ERROR_QUEUE_NAME);
if (!errorQueue.add(firstItem)) {
log.error("failed to move item {} the queue {}", firstItem, errorQueue);
throw new ReplicationQueueException("could not move an item to the error queue");
@@ -138,4 +125,14 @@ public class ErrorAwareQueueDistribution
}
}
+ private ReplicationQueueItem getItem(ReplicationPackage replicationPackage) {
+ ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+ replicationPackage.getPaths(),
+ replicationPackage.getAction(),
+ replicationPackage.getType(),
+ replicationPackage.getInfo());
+
+ return replicationQueueItem;
+ }
+
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Fri Oct 31 08:50:10 2014
@@ -20,12 +20,14 @@ package org.apache.sling.replication.que
import javax.annotation.Nonnull;
import java.util.Arrays;
+import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
import org.apache.sling.replication.queue.ReplicationQueueException;
@@ -60,31 +62,7 @@ public class PriorityPathDistributionStr
priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
}
- @Nonnull
- public ReplicationQueueItemState add(@Nonnull String agentName, @Nonnull ReplicationQueueItem item,
- @Nonnull ReplicationQueueProvider queueProvider)
- throws ReplicationQueueException {
- log.debug("using path priority based queue distribution");
- ReplicationQueueItemState state = new ReplicationQueueItemState();
-
- ReplicationQueue queue = getQueue(agentName, item, queueProvider);
- log.debug("obtained queue {}", queue);
-
- if (queue != null) {
- if (queue.add(item)) {
- log.info("replication status: {}", state);
- state = queue.getStatus(item);
- } else {
- log.error("could not add the item to the queue {}", queue);
- state.setItemState(ItemState.ERROR);
- state.setSuccessful(false);
- }
- return state;
- } else {
- throw new ReplicationQueueException("could not get a queue for agent " + agentName);
- }
- }
private ReplicationQueue getQueue(String agentName, ReplicationQueueItem replicationPackage,
ReplicationQueueProvider queueProvider)
@@ -116,15 +94,38 @@ public class PriorityPathDistributionStr
return queue;
}
- public boolean offer(String agentName, ReplicationQueueItem replicationPackage,
+ public boolean add(String agentName, ReplicationPackage replicationPackage,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
- ReplicationQueue queue = getQueue(agentName, replicationPackage, queueProvider);
+
+ ReplicationQueueItem queueItem = getItem(replicationPackage);
+ ReplicationQueue queue = getQueue(agentName, queueItem, queueProvider);
if (queue != null) {
- return queue.add(replicationPackage);
+ return queue.add(queueItem);
} else {
throw new ReplicationQueueException("could not get a queue for agent " + agentName);
}
+ }
+
+
+
+ public List<String> getQueueNames() {
+ List<String> paths = Arrays.asList(priorityPaths);
+ paths.add(DEFAULT_QUEUE_NAME);
+
+ return paths;
+ }
+
+ private ReplicationQueueItem getItem(ReplicationPackage replicationPackage) {
+ ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+ replicationPackage.getPaths(),
+ replicationPackage.getAction(),
+ replicationPackage.getType(),
+ replicationPackage.getInfo());
+ return replicationQueueItem;
}
+
+
+
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Fri Oct 31 08:50:10 2014
@@ -23,6 +23,7 @@ import javax.annotation.Nonnull;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
import org.apache.sling.replication.queue.ReplicationQueueException;
@@ -33,6 +34,10 @@ import org.apache.sling.replication.queu
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
/**
* The default strategy for delivering packages to queues. Each agent just manages a single queue,
* no failure / stuck handling where each package is put regardless of anything.
@@ -46,33 +51,26 @@ public class SingleQueueDistributionStra
private final Logger log = LoggerFactory.getLogger(getClass());
- @Nonnull
- public ReplicationQueueItemState add(@Nonnull String agentName, @Nonnull ReplicationQueueItem item,
- @Nonnull ReplicationQueueProvider queueProvider)
- throws ReplicationQueueException {
- log.debug("using single queue distribution");
-
- ReplicationQueueItemState state = new ReplicationQueueItemState();
+ public boolean add(String agentName, ReplicationPackage replicationPackage,
+ ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+ ReplicationQueueItem queueItem = getItem(replicationPackage);
ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
- log.debug("obtained queue {}", queue);
-
- if (queue.add(item)) {
- state = queue.getStatus(item);
- log.info("replication status: {}", state);
- } else {
- log.error("could not add the item to the queue {}", queue);
- state.setItemState(ItemState.ERROR);
- state.setSuccessful(false);
- }
- return state;
+ return queue.add(queueItem);
+ }
+ public List<String> getQueueNames() {
+ return Arrays.asList(new String[] { DEFAULT_QUEUE_NAME });
}
- public boolean offer(String agentName, ReplicationQueueItem replicationPackage,
- ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
- ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
- return queue.add(replicationPackage);
+ private ReplicationQueueItem getItem(ReplicationPackage replicationPackage) {
+ ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+ replicationPackage.getPaths(),
+ replicationPackage.getAction(),
+ replicationPackage.getType(),
+ replicationPackage.getInfo());
+
+ return replicationQueueItem;
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Fri Oct 31 08:50:10 2014
@@ -19,7 +19,6 @@
package org.apache.sling.replication.queue.impl.jobhandling;
import javax.annotation.Nonnull;
-import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.Map;
@@ -31,10 +30,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.Queue;
-import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueException;
@@ -43,8 +39,6 @@ import org.apache.sling.replication.queu
import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.Configuration;
-import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,17 +55,14 @@ public class JobHandlingReplicationQueue
@Reference
private JobManager jobManager;
- @Reference
- private ConfigurationAdmin configAdmin;
- private final Map<String, ServiceRegistration> jobs = new ConcurrentHashMap<String, ServiceRegistration>();
+ private final Map<String, ServiceRegistration> jobConsumers = new ConcurrentHashMap<String, ServiceRegistration>();
private BundleContext context;
- protected JobHandlingReplicationQueueProvider(JobManager jobManager, ConfigurationAdmin configAdmin, BundleContext context) {
+ protected JobHandlingReplicationQueueProvider(JobManager jobManager, BundleContext context) {
this.jobManager = jobManager;
- this.configAdmin = configAdmin;
this.context = context;
}
@@ -79,43 +70,16 @@ public class JobHandlingReplicationQueue
}
@Override
- protected ReplicationQueue getOrCreateQueue(String agentName, String queueName)
+ protected ReplicationQueue getInternalQueue(String agentName, String queueName)
throws ReplicationQueueException {
- try {
- String name = agentName;
- if (queueName.length() > 0) {
- name += "/" + queueName;
- }
- String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
- if (jobManager.getQueue(name) == null) {
- Configuration config = configAdmin.createFactoryConfiguration(
- QueueConfiguration.class.getName(), null);
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(ConfigurationConstants.PROP_NAME, name);
- props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
- props.put(ConfigurationConstants.PROP_TOPICS, new String[]{topic});
- props.put(ConfigurationConstants.PROP_RETRIES, -1);
- props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
- props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
- props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
- config.update(props);
- }
- return new JobHandlingReplicationQueue(name, topic, jobManager);
- } catch (IOException e) {
- throw new ReplicationQueueException("could not create a queue", e);
+ String name = agentName;
+ if (queueName.length() > 0) {
+ name += "/" + queueName;
}
+ String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
+ return new JobHandlingReplicationQueue(name, topic, jobManager);
}
- @Override
- protected void deleteQueue(ReplicationQueue queue) {
- String queueName = queue.getName();
- Queue q = jobManager.getQueue(queueName);
- if (q != null) {
- q.removeAll();
- } else {
- log.warn("cannot delete non existing queue {} ", queueName);
- }
- }
public void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor) {
// eventually register job consumer for sling job handling based queues
@@ -123,21 +87,21 @@ public class JobHandlingReplicationQueue
String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agentName;
String childTopic = topic + "/*";
jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
- synchronized (jobs) {
+ synchronized (jobConsumers) {
log.info("registering job consumer for agent {}", agentName);
ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
new ReplicationAgentJobConsumer(queueProcessor), jobProps);
if (jobReg != null) {
- jobs.put(agentName, jobReg);
+ jobConsumers.put(agentName, jobReg);
}
log.info("job consumer for agent {} registered", agentName);
}
}
public void disableQueueProcessing(@Nonnull String agentName) {
- synchronized (jobs) {
+ synchronized (jobConsumers) {
log.info("unregistering job consumer for agent {}", agentName);
- ServiceRegistration jobReg = jobs.remove(agentName);
+ ServiceRegistration jobReg = jobConsumers.remove(agentName);
if (jobReg != null) {
jobReg.unregister();
log.info("job consumer for agent {} unregistered", agentName);
@@ -152,7 +116,7 @@ public class JobHandlingReplicationQueue
@Deactivate
private void deactivate() {
- for (ServiceRegistration jobReg : jobs.values()) {
+ for (ServiceRegistration jobReg : jobConsumers.values()) {
jobReg.unregister();
}
this.context = null;
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Fri Oct 31 08:50:10 2014
@@ -54,7 +54,7 @@ public class SimpleReplicationQueueProvi
public SimpleReplicationQueueProvider() {
}
- protected ReplicationQueue getOrCreateQueue(String agentName, String selector)
+ protected ReplicationQueue getInternalQueue(String agentName, String selector)
throws ReplicationQueueException {
return new SimpleReplicationQueue(agentName, selector);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java Fri Oct 31 08:50:10 2014
@@ -38,7 +38,7 @@ public class ResourceSharedReplicationPa
private String PN_ORIGINAL_PATHS = "original.package.paths";
private String PACKAGE_NAME_PREFIX = "replpackage";
- private String sharedPackagesRoot = "/var/slingreplication/";
+ private String sharedPackagesRoot = "/var/sling/replication/";
private final ReplicationPackageBuilder replicationPackageBuilder;
@@ -114,7 +114,7 @@ public class ResourceSharedReplicationPa
properties.put(PN_ORIGINAL_ACTION, replicationPackage.getAction());
properties.put(PN_ORIGINAL_PATHS, replicationPackage.getPaths());
- Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath, "sling:Folder", "nt:unstructured", false);
+ Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath, "nt:unstructured", "sling:Folder", false);
ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
valueMap.putAll(properties);
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java Fri Oct 31 08:50:10 2014
@@ -63,17 +63,24 @@ public class ReplicationAgentServlet ext
try {
ReplicationResponse replicationResponse = agent.execute(resourceResolver, replicationRequest);
if (replicationResponse.isSuccessful()) {
- response.setStatus(200);
- } else if (ItemState.QUEUED.toString().equals(replicationResponse.getStatus())
- || ItemState.ACTIVE.toString().equals(
- replicationResponse.getStatus())) {
- response.setStatus(202);
- } else if (ItemState.DROPPED.toString().equals(
- replicationResponse.getStatus())) {
- response.setStatus(404);
- } else {
- response.setStatus(400);
+ if (ItemState.SUCCEEDED.toString().equals(replicationResponse.getStatus())) {
+ response.setStatus(200);
+ }
+ if (ItemState.QUEUED.toString().equals(replicationResponse.getStatus())
+ || ItemState.ACTIVE.toString().equals(
+ replicationResponse.getStatus())) {
+ response.setStatus(202);
+ }
+
+ }
+ else {
+ if (ItemState.DROPPED.toString().equals(replicationResponse.getStatus())) {
+ response.setStatus(404);
+ } else {
+ response.setStatus(400);
+ }
}
+
response.getWriter().append(replicationResponse.toString());
} catch (ReplicationAgentException e) {
response.setStatus(503);
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Fri Oct 31 08:50:10 2014
@@ -59,9 +59,7 @@ public class SimpleReplicationAgentTest
ReplicationRequestAuthorizationStrategy packageExporterStrategy = mock(ReplicationRequestAuthorizationStrategy.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.getItemState()).thenReturn(ReplicationQueueItemState.ItemState.ERROR);
- when(distributionHandler.add(any(String.class), any(ReplicationQueueItem.class), any(ReplicationQueueProvider.class))).thenReturn(state);
+ when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), any(ReplicationQueueProvider.class))).thenReturn(false);
ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
ResourceResolverFactory resolverFactory = mock(ResourceResolverFactory.class);
@@ -106,16 +104,14 @@ public class SimpleReplicationAgentTest
ResourceResolver resourceResolver = mock(ResourceResolver.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
- ReplicationQueueItemState state = new ReplicationQueueItemState();
- state.setItemState(ReplicationQueueItemState.ItemState.SUCCEEDED);
- when(distributionHandler.add(any(String.class), any(ReplicationQueueItem.class), eq(queueProvider))).thenReturn(state);
+ when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
.thenReturn(Arrays.asList(replicationPackage));
when(queueProvider.getDefaultQueue(name)).thenReturn(
new SimpleReplicationQueue(name, "name"));
ReplicationResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
- assertEquals("SUCCEEDED", response.getStatus());
+ assertEquals("QUEUED", response.getStatus());
}
@Test
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java Fri Oct 31 08:50:10 2014
@@ -20,6 +20,7 @@ package org.apache.sling.replication.que
import java.util.Dictionary;
+import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -43,33 +44,32 @@ public class ErrorAwareQueueDistribution
@Test
public void testPackageAdditionWithSucceedingItemDelivery() throws Exception {
ErrorAwareQueueDistributionStrategy errorAwareDistributionStrategy = new ErrorAwareQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.isSuccessful()).thenReturn(true);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertTrue(returnedState.isSuccessful());
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+
+ assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithFailingItemDelivery() throws Exception {
ErrorAwareQueueDistributionStrategy errorAwareDistributionStrategy = new ErrorAwareQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+ ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
+
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
+ when(queue.add(queueItem)).thenReturn(true);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertFalse(returnedState.isSuccessful());
+ when(queue.getStatus(queueItem)).thenReturn(state);
+ boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertFalse(returnedState);
}
@Test
@@ -81,22 +81,23 @@ public class ErrorAwareQueueDistribution
when(properties.get("stuck.handling")).thenReturn(new String[]{"ERROR"});
when(context.getProperties()).thenReturn(properties);
errorAwareDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+ ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
+
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- when(queue.getHead()).thenReturn(replicationPackage);
+ when(queue.add(queueItem)).thenReturn(true);
+ when(queue.getHead()).thenReturn(queueItem);
ReplicationQueue errorQueue = mock(ReplicationQueue.class);
- when(errorQueue.add(replicationPackage)).thenReturn(true);
- when(queueProvider.getQueue("agentName", "-error")).thenReturn(errorQueue);
+ when(errorQueue.add(queueItem)).thenReturn(true);
+ when(queueProvider.getQueue("agentName", ErrorAwareQueueDistributionStrategy.ERROR_QUEUE_NAME)).thenReturn(errorQueue);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
when(state.getAttempts()).thenReturn(2);
when(queue.getStatus(any(ReplicationQueueItem.class))).thenReturn(state);
- ReplicationQueueItemState returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertFalse(returnedState.isSuccessful());
+ boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertFalse(returnedState);
}
@Test
@@ -108,44 +109,44 @@ public class ErrorAwareQueueDistribution
when(properties.get("stuck.handling")).thenReturn(new String[]{"DROP"});
when(context.getProperties()).thenReturn(properties);
errorAwareDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- when(queue.getHead()).thenReturn(replicationPackage);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+ when(queue.getHead()).thenReturn(mock(ReplicationQueueItem.class));
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
when(state.getAttempts()).thenReturn(2);
when(queue.getStatus(any(ReplicationQueueItem.class))).thenReturn(state);
- ReplicationQueueItemState returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertFalse(returnedState.isSuccessful());
+ boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNullItemStateFromTheQueue() throws Exception {
ErrorAwareQueueDistributionStrategy errorAwareDistributionStrategy = new ErrorAwareQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+ boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNotNullItemStateFromTheQueue() throws Exception {
ErrorAwareQueueDistributionStrategy errorAwareDistributionStrategy = new ErrorAwareQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java Fri Oct 31 08:50:10 2014
@@ -20,6 +20,7 @@ package org.apache.sling.replication.que
import java.util.Dictionary;
+import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -31,6 +32,7 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -47,18 +49,15 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.isSuccessful()).thenReturn(true);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertTrue(returnedState.isSuccessful());
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -69,18 +68,15 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/sample1"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getQueue("agentName", "/content")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.isSuccessful()).thenReturn(true);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertTrue(returnedState.isSuccessful());
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -91,18 +87,15 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.isSuccessful()).thenReturn(false);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertFalse(returnedState.isSuccessful());
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -113,18 +106,16 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/sample2"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+
when(queueProvider.getQueue("agentName", "/content")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.isSuccessful()).thenReturn(false);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertFalse(returnedState.isSuccessful());
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -135,14 +126,15 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -153,14 +145,15 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps/some/stuff"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+
when(queueProvider.getQueue("agentName", "/apps")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -171,16 +164,14 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
@@ -191,15 +182,15 @@ public class PriorityPathQueueDistributi
when(properties.get("priority.paths")).thenReturn(new String[]{"/content", "/apps"});
when(context.getProperties()).thenReturn(properties);
priorityPathDistributionStrategy.activate(context);
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
when(replicationPackage.getPaths()).thenReturn(new String[]{"/apps"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+
when(queueProvider.getQueue("agentName", "/apps")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java Fri Oct 31 08:50:10 2014
@@ -18,6 +18,7 @@
*/
package org.apache.sling.replication.queue.impl;
+import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -28,6 +29,7 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -39,58 +41,54 @@ public class SingleQueueDistributionStra
@Test
public void testPackageAdditionWithSucceedingItemDelivery() throws Exception {
SingleQueueDistributionStrategy singleQueueDistributionStrategy = new SingleQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(state.isSuccessful()).thenReturn(true);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertTrue(returnedState.isSuccessful());
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithFailingItemDelivery() throws Exception {
SingleQueueDistributionStrategy singleQueueDistributionStrategy = new SingleQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
+ ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
+ when(queue.add(queueItem)).thenReturn(true);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
- assertFalse(returnedState.isSuccessful());
+ when(queue.getStatus(queueItem)).thenReturn(state);
+ boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertFalse(returnedState);
}
@Test
public void testPackageAdditionWithNullItemStateFromTheQueue() throws Exception {
SingleQueueDistributionStrategy singleQueueDistributionStrategy = new SingleQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+ boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
@Test
public void testPackageAdditionWithNotNullItemStateFromTheQueue() throws Exception {
SingleQueueDistributionStrategy singleQueueDistributionStrategy = new SingleQueueDistributionStrategy();
- ReplicationQueueItem replicationPackage = mock(ReplicationQueueItem.class);
+ ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
- when(queue.add(replicationPackage)).thenReturn(true);
- ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
- when(queue.getStatus(replicationPackage)).thenReturn(state);
- ReplicationQueueItemState returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
- assertNotNull(returnedState);
+ when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
+
+ boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
+ assertTrue(returnedState);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProviderTest.java Fri Oct 31 08:50:10 2014
@@ -40,43 +40,15 @@ public class JobHandlingReplicationQueue
@Test
public void testGetOrCreateNamedQueue() throws Exception {
JobManager jobManager = mock(JobManager.class);
- ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class);
- Configuration config = mock(Configuration.class);
- when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
+
BundleContext context = mock(BundleContext.class);
JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
- jobManager, configAdmin, context);
- ReplicationQueue queue = jobHandlingReplicationQueueProvider.getOrCreateQueue("dummy-agent", "default");
+ jobManager, context);
+ ReplicationQueue queue = jobHandlingReplicationQueueProvider.getInternalQueue("dummy-agent", "default");
assertNotNull(queue);
}
- @Test
- public void testDeleteNonExistingQueue() throws Exception {
- JobManager jobManager = mock(JobManager.class);
- ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class);
- Configuration config = mock(Configuration.class);
- when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
- BundleContext context = mock(BundleContext.class);
- JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
- jobManager, configAdmin, context);
- ReplicationQueue queue = mock(ReplicationQueue.class);
- jobHandlingReplicationQueueProvider.deleteQueue(queue);
- }
- @Test
- public void testDeleteExistingQueue() throws Exception {
- JobManager jobManager = mock(JobManager.class);
- ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class);
- Configuration config = mock(Configuration.class);
- when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
- BundleContext context = mock(BundleContext.class);
- JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
- jobManager, configAdmin, context);
- ReplicationQueue queue = jobHandlingReplicationQueueProvider.getOrCreateQueue("dummy-agent", "default");
- Queue underlyingQueue = mock(Queue.class);
- when(jobManager.getQueue(queue.getName())).thenReturn(underlyingQueue);
- jobHandlingReplicationQueueProvider.deleteQueue(queue);
- }
@Test
public void testEnableQueueProcessing() throws Exception {
@@ -86,7 +58,7 @@ public class JobHandlingReplicationQueue
when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
BundleContext context = mock(BundleContext.class);
JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
- jobManager, configAdmin, context);
+ jobManager, context);
String agentName = "dummy-agent";
ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
jobHandlingReplicationQueueProvider.enableQueueProcessing(agentName, queueProcessor);
@@ -100,7 +72,7 @@ public class JobHandlingReplicationQueue
when(configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null)).thenReturn(config);
BundleContext context = mock(BundleContext.class);
JobHandlingReplicationQueueProvider jobHandlingReplicationQueueProvider = new JobHandlingReplicationQueueProvider(
- jobManager, configAdmin, context);
+ jobManager, context);
String agentName = "dummy-agent";
jobHandlingReplicationQueueProvider.disableQueueProcessing(agentName);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java?rev=1635725&r1=1635724&r2=1635725&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProviderTest.java Fri Oct 31 08:50:10 2014
@@ -37,7 +37,7 @@ public class SimpleReplicationQueueProvi
@Test
public void testGetOrCreateQueue() throws Exception {
SimpleReplicationQueueProvider simpleReplicationQueueProvider = new SimpleReplicationQueueProvider();
- ReplicationQueue queue = simpleReplicationQueueProvider.getOrCreateQueue("dummy-agent", "default");
+ ReplicationQueue queue = simpleReplicationQueueProvider.getInternalQueue("dummy-agent", "default");
assertNotNull(queue);
}
Added: sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install/org.apache.sling.event.jobs.QueueConfiguration-replication.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install/org.apache.sling.event.jobs.QueueConfiguration-replication.json?rev=1635725&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install/org.apache.sling.event.jobs.QueueConfiguration-replication.json (added)
+++ sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install/org.apache.sling.event.jobs.QueueConfiguration-replication.json Fri Oct 31 08:50:10 2014
@@ -0,0 +1,8 @@
+{
+ "jcr:primaryType" : "sling:OsgiConfig",
+ "queue.name" : "org/apache/sling/replication/queue/{0}",
+ "queue.retries" : "-1",
+ "queue.topics": "org/apache/sling/replication/queue/*",
+ "queue.type" : "ORDERED"
+}
+