You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2018/11/10 23:11:16 UTC

[sling-org-apache-sling-distribution-core] branch SLING-8086-1 created (now 44b50e3)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a change to branch SLING-8086-1
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git.


      at 44b50e3  SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items

This branch includes the following new commits:

     new 44b50e3  SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[sling-org-apache-sling-distribution-core] 01/01: SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-8086-1
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git

commit 44b50e3f0d1084b9322984bd0ac1c0f3c123ec95
Author: tmaret <tm...@adobe.com>
AuthorDate: Fri Nov 9 14:26:56 2018 +0100

    SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items
---
 .../queue/impl/DistributionQueueWrapper.java       | 19 ++++++++++++++-
 .../jobhandling/JobHandlingDistributionQueue.java  | 16 ++++++++++++-
 .../queue/impl/resource/ResourceQueue.java         | 18 +++++++++++++-
 .../queue/impl/simple/SimpleDistributionQueue.java | 17 ++++++++++++-
 .../spi/{package-info.java => Clearable.java}      | 28 +++++++++++++++++++---
 .../sling/distribution/queue/spi/package-info.java |  2 +-
 .../servlet/DistributionAgentQueueServlet.java     | 14 ++++++++++-
 7 files changed, 105 insertions(+), 9 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index dc179d3..cc9fc45 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -18,13 +18,17 @@
  */
 package org.apache.sling.distribution.queue.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.jetbrains.annotations.NotNull;
 
-public abstract class DistributionQueueWrapper implements DistributionQueue {
+public abstract class DistributionQueueWrapper implements DistributionQueue, Clearable {
     final DistributionQueue wrappedQueue;
 
     DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -69,4 +73,17 @@ public abstract class DistributionQueueWrapper implements DistributionQueue {
     public DistributionQueueStatus getStatus() {
         return wrappedQueue.getStatus();
     }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index d19424f..39764d3 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
 /**
  * a {@link DistributionQueue} based on Sling Job Handling facilities
  */
-public class JobHandlingDistributionQueue implements DistributionQueue {
+public class JobHandlingDistributionQueue implements DistributionQueue, Clearable {
 
     public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue";
 
@@ -202,4 +204,16 @@ public class JobHandlingDistributionQueue implements DistributionQueue {
         return type;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 28162c3..1f6a94e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -29,6 +29,7 @@ import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueState;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.util.impl.DistributionUtils;
 import org.jetbrains.annotations.NotNull;
@@ -36,10 +37,11 @@ import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 
-public class ResourceQueue implements DistributionQueue {
+public class ResourceQueue implements DistributionQueue, Clearable {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
 
@@ -228,4 +230,18 @@ public class ResourceQueue implements DistributionQueue {
         DistributionQueueItem item = entry.getItem();
         log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() });
     }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 26e05d4..27eec29 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -48,7 +50,7 @@ import org.slf4j.LoggerFactory;
  * Note: potentially the Queue could contain the ordered package ids, with a sidecar map id->item;
  * that way removal could be faster.
  */
-public class SimpleDistributionQueue implements DistributionQueue {
+public class SimpleDistributionQueue implements DistributionQueue, Clearable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -168,4 +170,17 @@ public class SimpleDistributionQueue implements DistributionQueue {
                 '}';
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java
similarity index 51%
copy from src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
copy to src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java
index 96aa8bd..1bd6a7f 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java
@@ -16,9 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-@Version("0.0.1")
 package org.apache.sling.distribution.queue.spi;
 
-import aQute.bnd.annotation.Version;
+import aQute.bnd.annotation.ConsumerType;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Trait to be added to a {@link DistributionQueue} distribution
+ * queue that supports clearing all or a range of items via the
+ * {@link Clearable#clear} clearing methods.
+ *
+ * @since 0.1.0
+ */
+@ConsumerType
+public interface Clearable {
+
+    /**
+     * Clear a range of entries from the queue. The range starts from
+     * the head entry, includes the specified #limit number of entries.
+     *
+     * @param limit The maximum number of entries to remove. All entries
+     *              are removed when the limit is smaller than zero.
+     * @return an iterator over the removed entries
+     */
+    @NotNull
+    Iterable<DistributionQueueEntry> clear(int limit);
 
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
index 96aa8bd..3a80790 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-@Version("0.0.1")
+@Version("0.1.0")
 package org.apache.sling.distribution.queue.spi;
 
 import aQute.bnd.annotation.Version;
diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index dffc64c..966657d 100644
--- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -32,6 +32,7 @@ import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -79,7 +80,11 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet {
                 } catch (NumberFormatException ex) {
                     log.warn("limit param malformed : "+limitParam, ex);
                 }
-                deleteItems(resourceResolver, queue, limit);
+                if (queue instanceof Clearable) {
+                    clearItems(resourceResolver, queue, limit);
+                } else {
+                    deleteItems(resourceResolver, queue, limit);
+                }
             }
         } else if ("copy".equals(operation)) {
             String from = request.getParameter("from");
@@ -138,6 +143,13 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet {
         DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
     }
 
+    private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
+        for (DistributionQueueEntry removed : ((Clearable)queue).clear(limit)) {
+            DistributionPackage distributionPackage = getPackage(resourceResolver, removed.getItem());
+            DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+        }
+    }
+
     private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) {
         DistributionPackageInfo info = DistributionPackageUtils.fromQueueItem(item);
         String type = info.getType();