You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2016/05/18 08:12:16 UTC

svn commit: r1744357 - in /sling/trunk/contrib/extensions/distribution/core/src: main/java/org/apache/sling/distribution/queue/ main/java/org/apache/sling/distribution/queue/impl/ main/java/org/apache/sling/distribution/queue/impl/simple/ test/java/org...

Author: tommaso
Date: Wed May 18 08:12:16 2016
New Revision: 1744357

URL: http://svn.apache.org/viewvc?rev=1744357&view=rev
Log:
SLING-5733 - one processor per in memory queue, triggered every second

Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java Wed May 18 08:12:16 2016
@@ -77,7 +77,7 @@ public interface DistributionQueue {
     /**
      * gets an item from the queue by specifying its id
      *
-     * @param itemId the id of the item
+     * @param itemId the id of the item as returned by {@link DistributionQueueItem#getPackageId()}
      * @return the item, or {@code null} if the item with the given id
      * doesn't exist
      */
@@ -87,7 +87,7 @@ public interface DistributionQueue {
     /**
      * remove an item from the queue by specifying its id
      *
-     * @param itemId the id the item
+     * @param itemId the id the item as returned by {@link DistributionQueueItem#getPackageId()}
      * @return the removed item, or {@code null} if the item with the given id
      * doesn't exist
      */

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java Wed May 18 08:12:16 2016
@@ -43,6 +43,7 @@ public class CachingDistributionQueue ex
     }
 
 
+    @Nonnull
     @Override
     public DistributionQueueStatus getStatus() {
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java Wed May 18 08:12:16 2016
@@ -30,28 +30,25 @@ import org.slf4j.LoggerFactory;
 class ScheduledDistributionQueueProcessorTask implements Runnable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final SimpleDistributionQueueProvider queueProvider;
+    private final DistributionQueue queue;
     private final DistributionQueueProcessor queueProcessor;
 
-    public ScheduledDistributionQueueProcessorTask(SimpleDistributionQueueProvider queueProvider,
+    public ScheduledDistributionQueueProcessorTask(DistributionQueue queue,
                                                    DistributionQueueProcessor queueProcessor) {
-        this.queueProvider = queueProvider;
+        this.queue = queue;
         this.queueProcessor = queueProcessor;
     }
 
     public void run() {
         try {
-            for (DistributionQueue queue : queueProvider.getQueues()) {
-                DistributionQueueEntry entry;
-                while ((entry = queue.getHead()) != null) {
-
-                    if (queueProcessor.process(queue.getName(), entry)) {
-                        if (queue.remove(entry.getId()) != null) {
-                            log.debug("item {} processed and removed from the queue", entry.getItem());
-                        }
-                    } else {
-                        log.warn("processing of item {} failed", entry.getId());
+            DistributionQueueEntry entry;
+            while ((entry = queue.getHead()) != null) {
+                if (queueProcessor.process(queue.getName(), entry)) {
+                    if (queue.remove(entry.getId()) != null) {
+                        log.debug("item {} processed and removed from the queue", entry.getItem());
                     }
+                } else {
+                    log.warn("processing of item {} failed", entry.getId());
                 }
             }
         } catch (Exception e) {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java Wed May 18 08:12:16 2016
@@ -24,10 +24,9 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.WeakHashMap;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.sling.distribution.queue.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -45,6 +44,9 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * Note that, at the moment, this is a transient in memory queue not persisted on the repository and
  * therefore not usable for production.
+ *
+ * Note: potentially the Queue could contain the ordered package ids, with a sidecar map id->item;
+ * that way removal could be faster.
  */
 public class SimpleDistributionQueue implements DistributionQueue {
 
@@ -52,7 +54,7 @@ public class SimpleDistributionQueue imp
 
     private final String name;
 
-    private final BlockingQueue<DistributionQueueItem> queue;
+    private final Queue<DistributionQueueItem> queue;
 
     private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
 
@@ -72,9 +74,9 @@ public class SimpleDistributionQueue imp
         DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
         boolean result = false;
         try {
-            result = queue.offer(item, 10, TimeUnit.SECONDS);
+            result = queue.offer(item);
             itemState = DistributionQueueItemState.QUEUED;
-        } catch (InterruptedException e) {
+        } catch (Exception e) {
             log.error("cannot add an item to the queue", e);
         } finally {
             statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
@@ -154,4 +156,11 @@ public class SimpleDistributionQueue imp
         }
     }
 
+    @Override
+    public String toString() {
+        return "SimpleDistributionQueue{" +
+                "name='" + name + '\'' +
+                '}';
+    }
+
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java Wed May 18 08:12:16 2016
@@ -42,7 +42,7 @@ public class SimpleDistributionQueueProv
     private final String name;
     private final Scheduler scheduler;
 
-    private final Map<String, DistributionQueue> queueMap = new ConcurrentHashMap<String, DistributionQueue>();
+    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
 
     public SimpleDistributionQueueProvider(Scheduler scheduler, String name) {
         if (name == null || scheduler == null) {
@@ -58,7 +58,7 @@ public class SimpleDistributionQueueProv
     public DistributionQueue getQueue(@Nonnull String queueName) {
         String key = name + queueName;
 
-        DistributionQueue queue = queueMap.get(key);
+        SimpleDistributionQueue queue = queueMap.get(key);
         if (queue == null) {
             log.debug("creating a queue with key {}", key);
             queue = new SimpleDistributionQueue(name, queueName);
@@ -69,22 +69,31 @@ public class SimpleDistributionQueueProv
     }
 
 
-    Collection<DistributionQueue> getQueues() {
+    Collection<SimpleDistributionQueue> getQueues() {
         return queueMap.values();
     }
 
     public void enableQueueProcessing(@Nonnull DistributionQueueProcessor queueProcessor, String... queueNames) {
-        ScheduleOptions options = scheduler.NOW(-1, 10)
-                .canRunConcurrently(false)
-                .name(getJobName());
-        scheduler.schedule(new ScheduledDistributionQueueProcessorTask(this, queueProcessor), options);
+        for (String queueName : queueNames) {
+            ScheduleOptions options = scheduler.NOW(-1, 1)
+                    .canRunConcurrently(false)
+                    .name(getJobName(queueName));
+            scheduler.schedule(new ScheduledDistributionQueueProcessorTask(getQueue(queueName), queueProcessor), options);
+        }
     }
 
     public void disableQueueProcessing() {
-        scheduler.unschedule(getJobName());
+        for (DistributionQueue queue : getQueues()) {
+                String queueName = queue.getName();
+                if (scheduler.unschedule(getJobName(queueName))) {
+                    log.debug("queue processing on {} stopped", queue);
+                } else {
+                    log.warn("could not disable queue processing on {}", queue);
+                }
+        }
     }
 
-    private String getJobName() {
-        return "simple-queueProcessor-" + name;
+    private String getJobName(String queueName) {
+        return "simple-queueProcessor-" + name + "-" + queueName;
     }
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java Wed May 18 08:12:16 2016
@@ -37,10 +37,10 @@ public class ScheduledDistributionQueueP
 
     @Test
     public void testRunWithNoQueue() throws Exception {
-        SimpleDistributionQueueProvider queueProvider = mock(SimpleDistributionQueueProvider.class);
+        DistributionQueue queue = mock(DistributionQueue.class);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         ScheduledDistributionQueueProcessorTask scheduledDistributionQueueProcessorTask = new ScheduledDistributionQueueProcessorTask(
-                queueProvider, queueProcessor);
+                queue, queueProcessor);
         scheduledDistributionQueueProcessorTask.run();
     }
 
@@ -54,7 +54,7 @@ public class ScheduledDistributionQueueP
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         ScheduledDistributionQueueProcessorTask scheduledDistributionQueueProcessorTask = new ScheduledDistributionQueueProcessorTask(
-                queueProvider, queueProcessor);
+                queue, queueProcessor);
         scheduledDistributionQueueProcessorTask.run();
     }
 
@@ -70,7 +70,7 @@ public class ScheduledDistributionQueueP
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         ScheduledDistributionQueueProcessorTask scheduledDistributionQueueProcessorTask = new ScheduledDistributionQueueProcessorTask(
-                queueProvider, queueProcessor);
+                queue, queueProcessor);
         scheduledDistributionQueueProcessorTask.run();
     }
 }
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java Wed May 18 08:12:16 2016
@@ -41,13 +41,11 @@ public class SimpleDistributionQueueProv
         assertNotNull(queue);
     }
 
-
-
     @Test
     public void testEnableQueueProcessing() throws Exception {
         Scheduler scheduler = mock(Scheduler.class);
         ScheduleOptions options = mock(ScheduleOptions.class);
-        when(scheduler.NOW(-1, 10)).thenReturn(options);
+        when(scheduler.NOW(-1, 1)).thenReturn(options);
         when(options.canRunConcurrently(false)).thenReturn(options);
         when(options.name(any(String.class))).thenReturn(options);
         SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent");