You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2014/11/04 10:34:35 UTC
svn commit: r1636529 - in
/sling/trunk/contrib/extensions/replication/core/src:
main/java/org/apache/sling/replication/agent/
main/java/org/apache/sling/replication/agent/impl/
main/java/org/apache/sling/replication/monitor/
main/java/org/apache/sling/...
Author: mpetria
Date: Tue Nov 4 09:34:34 2014
New Revision: 1636529
URL: http://svn.apache.org/r1636529
Log:
SLING-4106: changing the queue API not to be aware of all queues
Added:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java
Modified:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java Tue Nov 4 09:34:34 2014
@@ -39,6 +39,13 @@ import org.apache.sling.replication.queu
@ProviderType
public interface ReplicationAgent extends ReplicationComponent {
+
+ /**
+ * retrieves the names of the queues for this agent.
+ * @return the list of queue names
+ */
+ Iterable<String> getQueueNames();
+
/**
* get the agent queue with the given name
*
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Tue Nov 4 09:34:34 2014
@@ -40,10 +40,7 @@ import org.apache.sling.replication.comm
import org.apache.sling.replication.component.ManagedReplicationComponent;
import org.apache.sling.replication.event.impl.ReplicationEventFactory;
import org.apache.sling.replication.event.ReplicationEventType;
-import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.packaging.ReplicationPackageExporter;
-import org.apache.sling.replication.packaging.ReplicationPackageImportException;
-import org.apache.sling.replication.packaging.ReplicationPackageImporter;
+import org.apache.sling.replication.packaging.*;
import org.apache.sling.replication.queue.*;
import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
import org.apache.sling.replication.trigger.ReplicationRequestHandler;
@@ -192,13 +189,17 @@ public class SimpleReplicationAgent impl
return replicationResponse;
}
+ public Iterable<String> getQueueNames() {
+ return queueDistributionStrategy.getQueueNames();
+ }
+
public ReplicationQueue getQueue(String queueName) throws ReplicationAgentException {
ReplicationQueue queue;
try {
if (queueName != null && queueName.length() > 0) {
queue = queueProvider.getQueue(this.name, queueName);
} else {
- queue = queueProvider.getDefaultQueue(this.name);
+ queue = queueProvider.getQueue(this.name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME);
}
} catch (ReplicationQueueException e) {
throw new ReplicationAgentException(e);
@@ -244,7 +245,7 @@ public class SimpleReplicationAgent impl
}
}
- private boolean processQueue(ReplicationQueueItem queueItem) {
+ private boolean processQueue(String queueName, ReplicationQueueItem queueItem) {
boolean success = false;
log.debug("reading package with id {}", queueItem.getId());
ResourceResolver agentResourceResolver = null;
@@ -265,7 +266,12 @@ public class SimpleReplicationAgent impl
properties.put("replication.agent.name", name);
replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_REPLICATED, properties);
- replicationPackage.delete();
+ if (replicationPackage instanceof SharedReplicationPackage) {
+ ((SharedReplicationPackage) replicationPackage).release(queueName);
+ }
+ else {
+ replicationPackage.delete();
+ }
success = true;
} else {
log.warn("replication package with id {} does not exist", queueItem.getId());
@@ -308,7 +314,7 @@ public class SimpleReplicationAgent impl
class PackageQueueProcessor implements ReplicationQueueProcessor {
public boolean process(@Nonnull String queueName, @Nonnull ReplicationQueueItem packageInfo) {
log.info("running package queue processor for queue {}", queueName);
- return processQueue(packageInfo);
+ return processQueue(queueName, packageInfo);
}
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java Tue Nov 4 09:34:34 2014
@@ -18,10 +18,8 @@
*/
package org.apache.sling.replication.monitor;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -38,6 +36,7 @@ import org.apache.sling.commons.osgi.Pro
import org.apache.sling.hc.api.HealthCheck;
import org.apache.sling.hc.api.Result;
import org.apache.sling.hc.util.FormattingResultLog;
+import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -58,12 +57,10 @@ import org.slf4j.LoggerFactory;
@Property(name = HealthCheck.MBEAN_NAME, value = "slingReplicationQueue", description = "Health Check MBean name", label = "MBean name")
})
@References({
- @Reference(name = "replicationQueueProvider",
- referenceInterface = ReplicationQueueProvider.class,
+ @Reference(name = "replicationAgent",
+ referenceInterface = ReplicationAgent.class,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
- policy = ReferencePolicy.DYNAMIC,
- bind = "bindReplicationQueueProvider",
- unbind = "unbindReplicationQueueProvider")
+ policy = ReferencePolicy.DYNAMIC)
})
@Service(value = HealthCheck.class)
@@ -78,7 +75,7 @@ public class ReplicationQueueHealthCheck
@Property(intValue = DEFAULT_NUMBER_OF_RETRIES_ALLOWED, description = "Number of allowed retries", label = "Allowed retries")
private static final String NUMBER_OF_RETRIES_ALLOWED = "numberOfRetriesAllowed";
- private final Collection<ReplicationQueueProvider> replicationQueueProviders = new LinkedList<ReplicationQueueProvider>();
+ private final List<ReplicationAgent> replicationAgents = new CopyOnWriteArrayList<ReplicationAgent>();
@Activate
public void activate(final Map<String, Object> properties) {
@@ -88,31 +85,30 @@ public class ReplicationQueueHealthCheck
@Deactivate
protected void deactivate() {
- replicationQueueProviders.clear();
+ replicationAgents.clear();
}
- protected void bindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
- synchronized (replicationQueueProviders) {
- replicationQueueProviders.add(replicationQueueProvider);
- }
- log.debug("Registering replication queue provider {} ", replicationQueueProvider);
+ protected void bindReplicationAgent(final ReplicationAgent replicationAgent) {
+ replicationAgents.add(replicationAgent);
+
+ log.debug("Registering replication agent {} ", replicationAgent);
}
- protected void unbindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
- synchronized (replicationQueueProviders) {
- replicationQueueProviders.remove(replicationQueueProvider);
- }
- log.debug("Unregistering replication queue provider {} ", replicationQueueProvider);
+ protected void unbindReplicationAgent(final ReplicationAgent replicationAgent) {
+ replicationAgents.remove(replicationAgent);
+ log.debug("Unregistering replication agent {} ", replicationAgent);
}
public Result execute() {
final FormattingResultLog resultLog = new FormattingResultLog();
Map<String, Integer> failures = new HashMap<String, Integer>();
- if (replicationQueueProviders.size() > 0) {
+ if (replicationAgents.size() > 0) {
- for (ReplicationQueueProvider replicationQueueProvider : replicationQueueProviders) {
- for (ReplicationQueue q : replicationQueueProvider.getAllQueues())
+ for (ReplicationAgent replicationAgent : replicationAgents) {
+ for (String queueName : replicationAgent.getQueueNames()) {
try {
+ ReplicationQueue q = replicationAgent.getQueue(queueName);
+
ReplicationQueueItem item = q.getHead();
if (item != null) {
ReplicationQueueItemState status = q.getStatus(item);
@@ -129,8 +125,9 @@ public class ReplicationQueueHealthCheck
}
} catch (Exception e) {
- resultLog.warn("Exception while inspecting replication queue [{}]: {}", q.getName(), e);
+ resultLog.warn("Exception while inspecting replication queue [{}]: {}", queueName, e);
}
+ }
}
} else {
resultLog.debug("No replication queue providers found");
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/SharedReplicationPackage.java Tue Nov 4 09:34:34 2014
@@ -30,12 +30,12 @@ public interface SharedReplicationPackag
/**
* acquire a reference to this package and increase the reference count.
*/
- void acquire();
+ void acquire(String holderName);
/**
* release a reference to this package and decrease the reference count.
* when no more references are hold the package <code>delete</code> method is called.
*/
- void release();
+ void release(String holderName);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java Tue Nov 4 09:34:34 2014
@@ -22,6 +22,7 @@ import aQute.bnd.annotation.ConsumerType
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Collection;
/**
@@ -76,10 +77,12 @@ public interface ReplicationQueue {
/**
* get the items in the queue
*
- * @return a <code>Collection</code> of {@link org.apache.sling.replication.queue.ReplicationQueueItem}s
+ * @param queueItemSelector represents the criteria to filter queue items.
+ * if null is passed then all items are returned.
+ * @return a <code>Iterable</code> of {@link org.apache.sling.replication.queue.ReplicationQueueItem}s
*/
@Nonnull
- Collection<ReplicationQueueItem> getItems();
+ Iterable<ReplicationQueueItem> getItems(@Nullable ReplicationQueueItemSelector queueItemSelector);
/**
* remove an item from the queue by specifying its id
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Tue Nov 4 09:34:34 2014
@@ -33,7 +33,7 @@ import java.util.List;
*/
@ConsumerType
public interface ReplicationQueueDistributionStrategy extends ReplicationComponent {
- String DEFAULT_QUEUE_NAME = "";
+ String DEFAULT_QUEUE_NAME = "default";
/**
* synchronously distribute a {@link org.apache.sling.replication.packaging.ReplicationPackage}
Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java?rev=1636529&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemSelector.java Tue Nov 4 09:34:34 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.replication.queue;
+
+/**
+ * Class representing criteria for queue items selection.
+ */
+public class ReplicationQueueItemSelector {
+ private final int skip;
+ private final int limit;
+
+ /**
+ *
+ * @param skip the number of items to skip
+ * @param limit the maximum number of items to return. use -1 to return all items.
+ */
+ public ReplicationQueueItemSelector(int skip, int limit) {
+ this.skip = skip;
+ this.limit = limit;
+ }
+
+ /**
+ * @return the number of items to skip from the queue.
+ */
+ public int getSkip() {
+ return skip;
+ }
+
+ /**
+ *
+ * @return return the maximum number of items to be selected.
+ */
+ public int getLimit() {
+ return limit;
+ }
+}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java Tue Nov 4 09:34:34 2014
@@ -26,36 +26,35 @@ import java.util.Calendar;
public class ReplicationQueueItemState {
- private int attempts;
+ private final int attempts;
- private ItemState state;
+ private final ItemState state;
- private Calendar entered;
+ private final Calendar entered;
- public boolean isSuccessful() {
- return ItemState.SUCCEEDED.equals(state);
+ public ReplicationQueueItemState(Calendar entered, ItemState state, int attempts) {
+
+ this.entered = entered;
+ this.state = state;
+ this.attempts = attempts;
}
- public void setSuccessful(boolean successful) {
- state = successful ? ItemState.SUCCEEDED : ItemState.ERROR;
+ public ReplicationQueueItemState(ItemState state) {
+ this(Calendar.getInstance(), state, 0);
}
- public int getAttempts() {
- return attempts;
+ public boolean isSuccessful() {
+ return ItemState.SUCCEEDED.equals(state);
}
- public void setAttempts(int attempts) {
- this.attempts = attempts;
+ public int getAttempts() {
+ return attempts;
}
public ItemState getItemState() {
return state;
}
- public void setItemState(ItemState status) {
- this.state = status;
- }
-
@Override
public String toString() {
return "{\"attempts\":\"" + attempts + "\",\"" + "successful\":\"" + isSuccessful() + "\",\"" + "state\":\"" + state + "\"}";
@@ -65,10 +64,6 @@ public class ReplicationQueueItemState {
return entered;
}
- public void setEntered(Calendar entered) {
- this.entered = entered;
- }
-
public enum ItemState {
QUEUED, // waiting in queue after adding or for restart after failing
ACTIVE, // job is currently in processing
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Tue Nov 4 09:34:34 2014
@@ -43,24 +43,6 @@ public interface ReplicationQueueProvide
throws ReplicationQueueException;
- /**
- * get the default queue to be used for a certain agent
- *
- * @param agentName a replication agent
- * @return the default replication queue for the given agent
- * @throws ReplicationQueueException
- */
- @Nonnull
- ReplicationQueue getDefaultQueue(@Nonnull String agentName)
- throws ReplicationQueueException;
-
- /**
- * get all the available queues from this provider
- *
- * @return a collection of replication queues
- */
- @Nonnull
- Collection<ReplicationQueue> getAllQueues();
/**
* enables queue driven processing for an agent
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Tue Nov 4 09:34:34 2014
@@ -54,12 +54,6 @@ public abstract class AbstractReplicatio
}
@Nonnull
- public ReplicationQueue getDefaultQueue(@Nonnull String agentName)
- throws ReplicationQueueException {
- return getQueue(agentName, "");
- }
-
- @Nonnull
public Collection<ReplicationQueue> getAllQueues() {
return queueMap.values();
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Tue Nov 4 09:34:34 2014
@@ -88,7 +88,7 @@ public class ErrorAwareQueueDistribution
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
boolean added;
ReplicationQueueItem queueItem = getItem(replicationPackage);
- ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
+ ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
added = queue.add(queueItem);
checkAndRemoveStuckItems(agentName, queueProvider);
return added;
@@ -100,7 +100,7 @@ public class ErrorAwareQueueDistribution
private void checkAndRemoveStuckItems(String agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
- ReplicationQueue defaultQueue = queueProvider.getDefaultQueue(agent);
+ ReplicationQueue defaultQueue = queueProvider.getQueue(agent, DEFAULT_QUEUE_NAME);
// get first item in the queue with its status
ReplicationQueueItem firstItem = defaultQueue.getHead();
if (firstItem != null) {
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Tue Nov 4 09:34:34 2014
@@ -89,7 +89,7 @@ public class PriorityPathDistributionStr
queue = queueProvider.getQueue(agentName, pp);
} else {
log.info("using default queue");
- queue = queueProvider.getDefaultQueue(agentName);
+ queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
}
return queue;
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Tue Nov 4 09:34:34 2014
@@ -54,7 +54,7 @@ public class SingleQueueDistributionStra
public boolean add(String agentName, ReplicationPackage replicationPackage,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueueItem queueItem = getItem(replicationPackage);
- ReplicationQueue queue = queueProvider.getDefaultQueue(agentName);
+ ReplicationQueue queue = queueProvider.getQueue(agentName, DEFAULT_QUEUE_NAME);
return queue.add(queueItem);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Tue Nov 4 09:34:34 2014
@@ -27,10 +27,7 @@ import java.util.Map;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobManager.QueryType;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.*;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,22 +75,25 @@ public class JobHandlingReplicationQueue
@Nonnull
public ReplicationQueueItemState getStatus(@Nonnull ReplicationQueueItem replicationPackage)
throws ReplicationQueueException {
- ReplicationQueueItemState itemStatus = new ReplicationQueueItemState();
try {
Map<String, Object> properties = JobHandlingUtils.createIdProperties(replicationPackage.getId());
Job job = jobManager.getJob(topic, properties);
if (job != null) {
- itemStatus.setAttempts(job.getRetryCount());
- itemStatus.setItemState(ItemState.valueOf(job.getJobState().toString()));
- itemStatus.setEntered(job.getCreated());
+
+ ReplicationQueueItemState itemState = new ReplicationQueueItemState(job.getCreated(),
+ ItemState.valueOf(job.getJobState().toString()),
+ job.getRetryCount());
+
log.info("status of job {} is {}", job.getId(), job.getJobState());
+
+ return itemState;
} else {
- itemStatus.setItemState(ItemState.DROPPED);
+ ReplicationQueueItemState itemState = new ReplicationQueueItemState(ItemState.DROPPED);
+ return itemState;
}
} catch (Exception e) {
throw new ReplicationQueueException("unable to retrieve the queue status", e);
}
- return itemStatus;
}
public ReplicationQueueItem getHead() {
@@ -108,7 +108,7 @@ public class JobHandlingReplicationQueue
private Job getFirstJob() {
log.info("getting first item in the queue");
- Collection<Job> jobs = getJobs(1);
+ List<Job> jobs = getJobs(0, 1);
if (jobs.size() > 0) {
Job firstItem = jobs.toArray(new Job[jobs.size()])[0];
log.info("first item in the queue is {}, retried {} times", firstItem.getId(), firstItem.getRetryCount());
@@ -128,18 +128,37 @@ public class JobHandlingReplicationQueue
return job;
}
- private Collection<Job> getJobs(int limit) {
- return jobManager.findJobs(QueryType.ALL, topic, limit);
+ private List<Job> getJobs(int skip, int limit) {
+ int actualSkip = skip < 0 ? 0 : skip;
+ int actualLimit = limit < 0 ? -1 : actualSkip + limit;
+
+
+ Collection<Job> jobs = jobManager.findJobs(QueryType.ALL, topic, actualLimit);
+ List<Job> result = new ArrayList<Job>();
+
+ int i =0;
+ for (Job job : jobs) {
+ if (i >= actualSkip) {
+ result.add(job);
+ }
+ i++;
+ }
+
+ return result;
}
public boolean isEmpty() {
- return getItems().isEmpty();
+ return getJobs(0, -1).isEmpty();
}
@Nonnull
- public List<ReplicationQueueItem> getItems() {
+ public List<ReplicationQueueItem> getItems(ReplicationQueueItemSelector selector) {
+ if (selector == null) {
+ selector = new ReplicationQueueItemSelector(0, -1);
+ }
+
List<ReplicationQueueItem> items = new ArrayList<ReplicationQueueItem>();
- Collection<Job> jobs = getJobs(-1);
+ Collection<Job> jobs = getJobs(selector.getSkip(), selector.getLimit());
for (Job job : jobs) {
items.add(JobHandlingUtils.getPackage(job));
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTask.java Tue Nov 4 09:34:34 2014
@@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory;
public class ScheduledReplicationQueueProcessorTask implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ReplicationQueueProvider queueProvider;
+ private final SimpleReplicationQueueProvider queueProvider;
private final ReplicationQueueProcessor queueProcessor;
- public ScheduledReplicationQueueProcessorTask(ReplicationQueueProvider queueProvider,
+ public ScheduledReplicationQueueProcessorTask(SimpleReplicationQueueProvider queueProvider,
ReplicationQueueProcessor queueProcessor) {
this.queueProvider = queueProvider;
this.queueProcessor = queueProcessor;
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Tue Nov 4 09:34:34 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueItem;
+import org.apache.sling.replication.queue.ReplicationQueueItemSelector;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
import org.slf4j.Logger;
@@ -63,54 +64,52 @@ public class SimpleReplicationQueue impl
}
public boolean add(@Nonnull ReplicationQueueItem item) {
- ReplicationQueueItemState status = new ReplicationQueueItemState();
+ ItemState itemState = ItemState.ERROR;
boolean result = false;
try {
result = queue.offer(item, 10, TimeUnit.SECONDS);
- status.setEntered(Calendar.getInstance());
+ itemState = ItemState.QUEUED;
} catch (InterruptedException e) {
log.error("cannot add an item to the queue", e);
- status.setSuccessful(false);
} finally {
- statusMap.put(item, status);
+ statusMap.put(item, new ReplicationQueueItemState(Calendar.getInstance(), itemState, 0));
}
return result;
}
@Nonnull
- public ReplicationQueueItemState getStatus(@Nonnull ReplicationQueueItem replicationPackage) {
- ReplicationQueueItemState status = statusMap.get(replicationPackage);
- if (queue.contains(replicationPackage)) {
- status.setItemState(ItemState.QUEUED);
+ public ReplicationQueueItemState getStatus(@Nonnull ReplicationQueueItem queueItem) {
+ ReplicationQueueItemState itemStatus = statusMap.get(queueItem);
+
+ if (queue.contains(queueItem)) {
+ return itemStatus;
} else {
- status.setItemState(ItemState.SUCCEEDED);
+ return new ReplicationQueueItemState(itemStatus.getEntered(), ItemState.SUCCEEDED, itemStatus.getAttempts());
}
- return status;
}
public ReplicationQueueItem getHead() {
ReplicationQueueItem element = queue.peek();
if (element != null) {
- ReplicationQueueItemState replicationQueueItemStatus = statusMap.get(element);
- replicationQueueItemStatus.setAttempts(replicationQueueItemStatus.getAttempts() + 1);
+ ReplicationQueueItemState itemState = statusMap.get(element);
+ statusMap.put(element, new ReplicationQueueItemState(itemState.getEntered(),
+ itemState.getItemState(),
+ itemState.getAttempts()+1));
}
return element;
}
- public void removeHead() {
- ReplicationQueueItem element = queue.remove();
- statusMap.get(element).setSuccessful(true);
- }
-
public boolean isEmpty() {
return queue.isEmpty();
}
@Nonnull
- public Collection<ReplicationQueueItem> getItems() {
+ public Iterable<ReplicationQueueItem> getItems(ReplicationQueueItemSelector queueItemSelector) {
return queue;
}
+
+
public void remove(@Nonnull String id) {
ReplicationQueueItem toRemove = null;
for (ReplicationQueueItem item : queue) {
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackage.java Tue Nov 4 09:34:34 2014
@@ -20,10 +20,7 @@ package org.apache.sling.replication.ser
import javax.annotation.Nonnull;
-import org.apache.sling.api.resource.ModifiableValueMap;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.*;
import org.apache.sling.replication.packaging.ReplicationPackage;
import org.apache.sling.replication.packaging.ReplicationPackageInfo;
import org.apache.sling.replication.packaging.SharedReplicationPackage;
@@ -32,14 +29,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
public class ResourceSharedReplicationPackage implements SharedReplicationPackage {
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final Object syncObject = new Object();
-
+ protected static final String REFERENCE_ROOT_NODE = "refs";
private static final String PN_REFERENCE_COUNT = "ref.count";
+
private final ResourceResolver resourceResolver;
private final String packagePath;
private final ReplicationPackage replicationPackage;
@@ -50,41 +48,33 @@ public class ResourceSharedReplicationPa
this.replicationPackage = replicationPackage;
}
- public void acquire() {
- synchronized (syncObject) {
- Resource resource = getProxyResource();
- ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
- int refCount = valueMap.get(PN_REFERENCE_COUNT, 0);
- refCount ++;
- valueMap.put(PN_REFERENCE_COUNT, refCount);
-
- try {
- resourceResolver.commit();
- } catch (PersistenceException e) {
- log.error("cannot release package", e);
- }
+ public void acquire(String holderName) {
+ if (holderName == null || holderName.length() == 0) {
+ throw new IllegalArgumentException("holder name cannot be null or empty");
+ }
+
+ try {
+ createHolderResource(holderName);
+ } catch (PersistenceException e) {
+ log.error("cannot acquire package", e);
}
}
- public void release() {
- synchronized (syncObject) {
- Resource resource = getProxyResource();
- ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
- int refCount = valueMap.get(PN_REFERENCE_COUNT, 0);
- refCount --;
+ public void release(String holderName) {
- if (refCount > 0) {
- valueMap.put(PN_REFERENCE_COUNT, refCount);
- }
- else {
- delete();
- }
+ if (holderName == null || holderName.length() == 0) {
+ throw new IllegalArgumentException("holder name cannot be null or empty");
+ }
- try {
- resourceResolver.commit();
- } catch (PersistenceException e) {
- log.error("cannot release package", e);
+ try {
+ deleteHolderResource(holderName);
+
+ Resource holderRoot = getHolderRootResource();
+ if (!holderRoot.hasChildren()) {
+ delete();
}
+ } catch (PersistenceException e) {
+ log.error("cannot release package", e);
}
}
@@ -141,8 +131,60 @@ public class ResourceSharedReplicationPa
private Resource getProxyResource() {
- Resource resource = resourceResolver.getResource(packagePath);
+ String holderPath = packagePath;
+
+ resourceResolver.refresh();
+ Resource resource = resourceResolver.getResource(holderPath);
return resource;
}
+
+
+
+ private Resource getHolderRootResource() {
+ Resource resource = getProxyResource();
+
+ Resource holderRoot = resource.getChild(REFERENCE_ROOT_NODE);
+ if (holderRoot != null) {
+ return holderRoot;
+ }
+
+ return null;
+ }
+
+ private void createHolderResource(String holderName) throws PersistenceException {
+ Resource holderRoot = getHolderRootResource();
+
+ if (holderRoot == null) {
+ return;
+ }
+
+ Resource holder = holderRoot.getChild(holderName);
+
+ if (holder != null) {
+ return;
+ }
+
+ resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "nt:unstructured"));
+ resourceResolver.commit();
+
+ }
+
+ private void deleteHolderResource(String holderName) throws PersistenceException {
+ Resource holderRoot = getHolderRootResource();
+
+ if (holderRoot == null) {
+ return;
+ }
+
+ Resource holder = holderRoot.getChild(holderName);
+
+ if (holder == null) {
+ return;
+ }
+
+ resourceResolver.delete(holder);
+ resourceResolver.commit();
+ }
+
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/serialization/impl/ResourceSharedReplicationPackageBuilder.java Tue Nov 4 09:34:34 2014
@@ -84,6 +84,11 @@ public class ResourceSharedReplicationPa
public ReplicationPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String replicationPackageId) {
String originalPackageId = retrieveIdFromPath(resourceResolver, replicationPackageId);
+
+ if (originalPackageId == null) {
+ return null;
+ }
+
ReplicationPackage replicationPackage = replicationPackageBuilder.getPackage(resourceResolver, originalPackageId);
if (replicationPackage == null) {
@@ -115,21 +120,35 @@ public class ResourceSharedReplicationPa
properties.put(PN_ORIGINAL_PATHS, replicationPackage.getPaths());
Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath, "nt:unstructured", "sling:Folder", false);
+
ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
valueMap.putAll(properties);
+ resourceResolver.create(resource, ResourceSharedReplicationPackage.REFERENCE_ROOT_NODE,
+ Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)"nt:unstructured"));
+
resourceResolver.commit();
return packagePath;
}
private String retrieveIdFromPath(ResourceResolver resourceResolver, String packagePath) {
- if (!packagePath.startsWith(sharedPackagesRoot)) return null;
+ if (!packagePath.startsWith(sharedPackagesRoot)) {
+ return null;
+ }
Resource resource = resourceResolver.getResource(packagePath);
+ if (resource == null) {
+ return null;
+ }
+
ValueMap properties = resource.adaptTo(ValueMap.class);
+ if (properties == null) {
+ return null;
+ }
+
return properties.get(PN_ORIGINAL_ID, null);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueServlet.java Tue Nov 4 09:34:34 2014
@@ -90,16 +90,14 @@ public class ReplicationAgentQueueServle
StringBuilder builder = new StringBuilder("{\"name\":\"" + queue.getName() + "\",\"empty\":" + queue.isEmpty());
if (!queue.isEmpty()) {
builder.append(",\"items\":[");
- for (ReplicationQueueItem item : queue.getItems()) {
+ for (ReplicationQueueItem item : queue.getItems(null)) {
builder.append('{');
builder.append(toJSoN(item));
builder.append(',');
builder.append(toJSoN(queue.getStatus(item)));
builder.append("},");
}
- if (queue.getItems().size() > 0) {
- builder.deleteCharAt(builder.length() - 1);
- }
+ builder.deleteCharAt(builder.length() - 1);
builder.append(']');
}
builder.append('}');
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Tue Nov 4 09:34:34 2014
@@ -76,7 +76,7 @@ public class SimpleReplicationAgentTest
when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
.thenReturn(Arrays.asList(replicationPackage));
- when(queueProvider.getDefaultQueue(name)).thenReturn(
+ when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
new SimpleReplicationQueue(name, "name"));
ReplicationResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
@@ -107,7 +107,7 @@ public class SimpleReplicationAgentTest
when(distributionHandler.add(any(String.class), any(ReplicationPackage.class), eq(queueProvider))).thenReturn(true);
when(packageExporter.exportPackages(any(ResourceResolver.class), any(ReplicationRequest.class)))
.thenReturn(Arrays.asList(replicationPackage));
- when(queueProvider.getDefaultQueue(name)).thenReturn(
+ when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
new SimpleReplicationQueue(name, "name"));
ReplicationResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
@@ -137,7 +137,7 @@ public class SimpleReplicationAgentTest
when(replicationPackage.getPaths()).thenReturn(new String[]{"/"});
when(packageExporter.exportPackages(resourceResolver, request)).thenReturn(Arrays.asList(replicationPackage));
- when(queueProvider.getDefaultQueue(name)).thenReturn(
+ when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
new SimpleReplicationQueue(name, "name"));
agent.execute(resourceResolver, request);
@@ -161,7 +161,8 @@ public class SimpleReplicationAgentTest
queueProvider, distributionHandler,
replicationEventFactory, resolverFactory, null);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue(name)).thenReturn(queue);
+ when(queueProvider.getQueue(name, ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME))
+ .thenReturn(queue);
assertNotNull(agent.getQueue(null));
}
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java Tue Nov 4 09:34:34 2014
@@ -18,11 +18,10 @@
*/
package org.apache.sling.replication.monitor;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
+import java.util.*;
import org.apache.sling.hc.api.Result;
+import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
@@ -32,7 +31,9 @@ import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.when;
/**
@@ -56,11 +57,12 @@ public class ReplicationQueueHealthCheck
replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queue.getHead()).thenReturn(null);
- ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
- Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
- providers.add(queue);
- when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
- replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+ ReplicationAgent replicationAgent = mock(ReplicationAgent.class);
+
+ List<String> queues = new ArrayList<String>();
+ queues.add("queueName");
+ when(replicationAgent.getQueueNames()).thenReturn(queues); when(replicationAgent.getQueue(anyString())).thenReturn(queue);
+ replicationQueueHealthCheck.bindReplicationAgent(replicationAgent);
Result result = replicationQueueHealthCheck.execute();
assertNotNull(result);
@@ -78,11 +80,14 @@ public class ReplicationQueueHealthCheck
when(status.getAttempts()).thenReturn(1);
when(queue.getStatus(item)).thenReturn(status);
when(queue.getHead()).thenReturn(item);
- ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
- Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
- providers.add(queue);
- when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
- replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+ ReplicationAgent replicationAgent = mock(ReplicationAgent.class);
+
+ List<String> queues = new ArrayList<String>();
+ queues.add("queueName");
+ when(replicationAgent.getQueueNames()).thenReturn(queues);
+ when(replicationAgent.getQueue(anyString())).thenReturn(queue);
+ replicationQueueHealthCheck.bindReplicationAgent(replicationAgent);
+
Result result = replicationQueueHealthCheck.execute();
assertNotNull(result);
@@ -100,11 +105,13 @@ public class ReplicationQueueHealthCheck
when(status.getAttempts()).thenReturn(10);
when(queue.getStatus(item)).thenReturn(status);
when(queue.getHead()).thenReturn(item);
- ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
- Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
- providers.add(queue);
- when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
- replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+ ReplicationAgent replicationAgent = mock(ReplicationAgent.class);
+
+ List<String> queues = new ArrayList<String>();
+ queues.add("queueName");
+ when(replicationAgent.getQueueNames()).thenReturn(queues);
+ when(replicationAgent.getQueue(anyString())).thenReturn(queue);
+ replicationQueueHealthCheck.bindReplicationAgent(replicationAgent);
Result result = replicationQueueHealthCheck.execute();
assertNotNull(result);
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java Tue Nov 4 09:34:34 2014
@@ -21,10 +21,7 @@ package org.apache.sling.replication.que
import java.util.Dictionary;
import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
import org.junit.Test;
import org.osgi.service.component.ComponentContext;
@@ -47,7 +44,7 @@ public class ErrorAwareQueueDistribution
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -63,7 +60,7 @@ public class ErrorAwareQueueDistribution
ReplicationQueue queue = mock(ReplicationQueue.class);
ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(queueItem)).thenReturn(true);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
@@ -86,7 +83,7 @@ public class ErrorAwareQueueDistribution
ReplicationQueue queue = mock(ReplicationQueue.class);
ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(queueItem)).thenReturn(true);
when(queue.getHead()).thenReturn(queueItem);
ReplicationQueue errorQueue = mock(ReplicationQueue.class);
@@ -113,7 +110,7 @@ public class ErrorAwareQueueDistribution
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
when(queue.getHead()).thenReturn(mock(ReplicationQueueItem.class));
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
@@ -131,7 +128,7 @@ public class ErrorAwareQueueDistribution
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
assertTrue(returnedState);
@@ -143,7 +140,7 @@ public class ErrorAwareQueueDistribution
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = errorAwareDistributionStrategy.add("agentName", replicationPackage, queueProvider);
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java Tue Nov 4 09:34:34 2014
@@ -21,10 +21,7 @@ package org.apache.sling.replication.que
import java.util.Dictionary;
import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
import org.junit.Test;
import org.osgi.service.component.ComponentContext;
@@ -53,7 +50,7 @@ public class PriorityPathQueueDistributi
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -91,7 +88,7 @@ public class PriorityPathQueueDistributi
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -131,7 +128,7 @@ public class PriorityPathQueueDistributi
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
assertTrue(returnedState);
@@ -168,7 +165,7 @@ public class PriorityPathQueueDistributi
when(replicationPackage.getPaths()).thenReturn(new String[]{"/etc"});
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = priorityPathDistributionStrategy.add("agentName", replicationPackage, queueProvider);
assertTrue(returnedState);
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java Tue Nov 4 09:34:34 2014
@@ -19,10 +19,7 @@
package org.apache.sling.replication.queue.impl;
import org.apache.sling.replication.packaging.ReplicationPackage;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueItem;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
@@ -44,7 +41,7 @@ public class SingleQueueDistributionStra
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
@@ -58,7 +55,7 @@ public class SingleQueueDistributionStra
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
ReplicationQueueItem queueItem = mock(ReplicationQueueItem.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(queueItem)).thenReturn(true);
ReplicationQueueItemState state = mock(ReplicationQueueItemState.class);
when(state.isSuccessful()).thenReturn(false);
@@ -73,7 +70,7 @@ public class SingleQueueDistributionStra
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
assertTrue(returnedState);
@@ -85,7 +82,7 @@ public class SingleQueueDistributionStra
ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
ReplicationQueue queue = mock(ReplicationQueue.class);
- when(queueProvider.getDefaultQueue("agentName")).thenReturn(queue);
+ when(queueProvider.getQueue("agentName", ReplicationQueueDistributionStrategy.DEFAULT_QUEUE_NAME)).thenReturn(queue);
when(queue.add(any(ReplicationQueueItem.class))).thenReturn(true);
boolean returnedState = singleQueueDistributionStrategy.add("agentName", replicationPackage, queueProvider);
Modified: sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java?rev=1636529&r1=1636528&r2=1636529&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/test/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessorTaskTest.java Tue Nov 4 09:34:34 2014
@@ -37,7 +37,7 @@ public class ScheduledReplicationQueuePr
@Test
public void testRunWithNoQueue() throws Exception {
- ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+ SimpleReplicationQueueProvider queueProvider = mock(SimpleReplicationQueueProvider.class);
ReplicationQueueProcessor queueProcessor = mock(ReplicationQueueProcessor.class);
ScheduledReplicationQueueProcessorTask scheduledReplicationQueueProcessorTask = new ScheduledReplicationQueueProcessorTask(
queueProvider, queueProcessor);
@@ -46,7 +46,7 @@ public class ScheduledReplicationQueuePr
@Test
public void testRunWithOneEmptyQueue() throws Exception {
- ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+ SimpleReplicationQueueProvider queueProvider = mock(SimpleReplicationQueueProvider.class);
Collection<ReplicationQueue> queues = new LinkedList<ReplicationQueue>();
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queue.isEmpty()).thenReturn(true);
@@ -60,7 +60,7 @@ public class ScheduledReplicationQueuePr
@Test
public void testRunWithOneNonEmptyQueue() throws Exception {
- ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
+ SimpleReplicationQueueProvider queueProvider = mock(SimpleReplicationQueueProvider.class);
Collection<ReplicationQueue> queues = new LinkedList<ReplicationQueue>();
ReplicationQueue queue = mock(ReplicationQueue.class);
when(queue.isEmpty()).thenReturn(false).thenReturn(true);