You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/12/20 12:39:24 UTC
[sling-org-apache-sling-distribution-core] branch master updated:
SLING-8853 - adapt contribution to match with #26 from SLING-8854
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git
The following commit(s) were added to refs/heads/master by this push:
new 20f5575 SLING-8853 - adapt contribution to match with #26 from SLING-8854
20f5575 is described below
commit 20f5575af5318c07c31e7623319cf4f1d20258c8
Author: tmaret <tm...@adobe.com>
AuthorDate: Fri Dec 20 13:38:18 2019 +0100
SLING-8853 - adapt contribution to match with #26 from SLING-8854
---
.../queue/impl/resource/ResourceQueueProvider.java | 19 +++++++++++++++----
.../impl/simple/SimpleDistributionQueueProvider.java | 18 ++++++++++--------
2 files changed, 25 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index d2df714..221501a 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -23,6 +23,7 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
@@ -56,6 +57,9 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
+
+
private ServiceRegistration<Runnable> cleanupTask;
public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
@@ -77,11 +81,13 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
@NotNull
@Override
public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
- return queueMap.computeIfAbsent(queueName, name -> {
+ String key = getKey(queueName);
+ return queueMap.computeIfAbsent(key, k -> {
+ statusMap.put(key, new ConcurrentHashMap<>());
if (isActive) {
- return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+ return new ActiveResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
} else {
- return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+ return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
}
});
}
@@ -105,7 +111,8 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
.canRunConcurrently(false)
.onSingleInstanceOnly(true)
.name(getJobName(queueName));
- scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+ scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
+ statusMap.get(getKey(queueName))), options);
}
} else {
throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
@@ -143,6 +150,10 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
cleanupTask = context.registerService(Runnable.class, cleanup, props);
}
+ private String getKey(String queueName) {
+ return agentName + "#" + queueName;
+ }
+
public void close() {
if (cleanupTask != null) {
cleanupTask.unregister();
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index 91ca984..bc3d3c7 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -26,7 +26,6 @@ import java.io.FileReader;
import java.io.FilenameFilter;
import java.util.Collection;
import java.util.Map;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
@@ -57,9 +56,8 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
private final String name;
private final Scheduler scheduler;
- private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
- private final Map<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>> statusMap
- = new WeakHashMap<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>>();
+ private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
private final boolean checkpoint;
private File checkpointDirectory;
@@ -89,16 +87,16 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
@NotNull
public DistributionQueue getQueue(@NotNull String queueName) {
- String key = name + queueName;
+ String key = getKey(queueName);
SimpleDistributionQueue queue = queueMap.get(key);
if (queue == null) {
log.debug("creating a queue with key {}", key);
Map<String, DistributionQueueItemStatus> queueStatusMap
- = new ConcurrentHashMap<String, DistributionQueueItemStatus>();
+ = new ConcurrentHashMap<>();
queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
queueMap.put(key, queue);
- statusMap.put(queue, queueStatusMap);
+ statusMap.put(key, queueStatusMap);
log.debug("queue created {}", queue);
}
return queue;
@@ -159,7 +157,7 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
for (String queueName : queueNames) {
ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
- statusMap.get(getQueue(queueName))), options);
+ statusMap.get(getKey(queueName))), options);
}
}
@@ -184,6 +182,10 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
}
}
+ private String getKey(String queueName) {
+ return name + "#" + queueName;
+ }
+
private String getJobName(String queueName) {
return "simple-queueProcessor-" + name + "-" + queueName;
}