You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/12/20 12:39:24 UTC

[sling-org-apache-sling-distribution-core] branch master updated: SLING-8853 - adapt contribution to match with #26 from SLING-8854

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 20f5575  SLING-8853 - adapt contribution to match with #26 from SLING-8854
20f5575 is described below

commit 20f5575af5318c07c31e7623319cf4f1d20258c8
Author: tmaret <tm...@adobe.com>
AuthorDate: Fri Dec 20 13:38:18 2019 +0100

    SLING-8853 - adapt contribution to match with #26 from SLING-8854
---
 .../queue/impl/resource/ResourceQueueProvider.java    | 19 +++++++++++++++----
 .../impl/simple/SimpleDistributionQueueProvider.java  | 18 ++++++++++--------
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index d2df714..221501a 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -23,6 +23,7 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
@@ -56,6 +57,9 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
 
     private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
 
+    private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
+
+
     private ServiceRegistration<Runnable> cleanupTask;
 
     public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
@@ -77,11 +81,13 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
     @NotNull
     @Override
     public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
-        return queueMap.computeIfAbsent(queueName, name -> {
+        String key = getKey(queueName);
+        return queueMap.computeIfAbsent(key, k -> {
+            statusMap.put(key, new ConcurrentHashMap<>());
             if (isActive) {
-                return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+                return new ActiveResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
             } else {
-                return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+                return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
             }
         });
     }
@@ -105,7 +111,8 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
                         .canRunConcurrently(false)
                         .onSingleInstanceOnly(true)
                         .name(getJobName(queueName));
-                scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+                scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
+                        statusMap.get(getKey(queueName))), options);
             }
         } else {
             throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
@@ -143,6 +150,10 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
         cleanupTask = context.registerService(Runnable.class, cleanup, props);
     }
 
+    private String getKey(String queueName) {
+        return agentName + "#" + queueName;
+    }
+
     public void close() {
         if (cleanupTask != null) {
             cleanupTask.unregister();
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index 91ca984..bc3d3c7 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -26,7 +26,6 @@ import java.io.FileReader;
 import java.io.FilenameFilter;
 import java.util.Collection;
 import java.util.Map;
-import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -57,9 +56,8 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
     private final String name;
     private final Scheduler scheduler;
 
-    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
-    private final Map<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>> statusMap
-            = new WeakHashMap<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>>();
+    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<>();
+    private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
     private final boolean checkpoint;
     private File checkpointDirectory;
 
@@ -89,16 +87,16 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
 
     @NotNull
     public DistributionQueue getQueue(@NotNull String queueName) {
-        String key = name + queueName;
+        String key = getKey(queueName);
 
         SimpleDistributionQueue queue = queueMap.get(key);
         if (queue == null) {
             log.debug("creating a queue with key {}", key);
             Map<String, DistributionQueueItemStatus> queueStatusMap
-                    = new ConcurrentHashMap<String, DistributionQueueItemStatus>();
+                    = new ConcurrentHashMap<>();
             queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
             queueMap.put(key, queue);
-            statusMap.put(queue, queueStatusMap);
+            statusMap.put(key, queueStatusMap);
             log.debug("queue created {}", queue);
         }
         return queue;
@@ -159,7 +157,7 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
         for (String queueName : queueNames) {
             ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
             scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
-                    statusMap.get(getQueue(queueName))), options);
+                    statusMap.get(getKey(queueName))), options);
         }
 
     }
@@ -184,6 +182,10 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
         }
     }
 
+    private String getKey(String queueName) {
+        return name + "#" + queueName;
+    }
+
     private String getJobName(String queueName) {
         return "simple-queueProcessor-" + name + "-" + queueName;
     }