You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2016/05/18 08:12:16 UTC
svn commit: r1744357 - in
/sling/trunk/contrib/extensions/distribution/core/src:
main/java/org/apache/sling/distribution/queue/
main/java/org/apache/sling/distribution/queue/impl/
main/java/org/apache/sling/distribution/queue/impl/simple/ test/java/org...
Author: tommaso
Date: Wed May 18 08:12:16 2016
New Revision: 1744357
URL: http://svn.apache.org/viewvc?rev=1744357&view=rev
Log:
SLING-5733 - one processor per in memory queue, triggered every second
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/DistributionQueue.java Wed May 18 08:12:16 2016
@@ -77,7 +77,7 @@ public interface DistributionQueue {
/**
* gets an item from the queue by specifying its id
*
- * @param itemId the id of the item
+ * @param itemId the id of the item as returned by {@link DistributionQueueItem#getPackageId()}
* @return the item, or {@code null} if the item with the given id
* doesn't exist
*/
@@ -87,7 +87,7 @@ public interface DistributionQueue {
/**
* remove an item from the queue by specifying its id
*
- * @param itemId the id the item
+ * @param itemId the id the item as returned by {@link DistributionQueueItem#getPackageId()}
* @return the removed item, or {@code null} if the item with the given id
* doesn't exist
*/
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java Wed May 18 08:12:16 2016
@@ -43,6 +43,7 @@ public class CachingDistributionQueue ex
}
+ @Nonnull
@Override
public DistributionQueueStatus getStatus() {
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTask.java Wed May 18 08:12:16 2016
@@ -30,28 +30,25 @@ import org.slf4j.LoggerFactory;
class ScheduledDistributionQueueProcessorTask implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final SimpleDistributionQueueProvider queueProvider;
+ private final DistributionQueue queue;
private final DistributionQueueProcessor queueProcessor;
- public ScheduledDistributionQueueProcessorTask(SimpleDistributionQueueProvider queueProvider,
+ public ScheduledDistributionQueueProcessorTask(DistributionQueue queue,
DistributionQueueProcessor queueProcessor) {
- this.queueProvider = queueProvider;
+ this.queue = queue;
this.queueProcessor = queueProcessor;
}
public void run() {
try {
- for (DistributionQueue queue : queueProvider.getQueues()) {
- DistributionQueueEntry entry;
- while ((entry = queue.getHead()) != null) {
-
- if (queueProcessor.process(queue.getName(), entry)) {
- if (queue.remove(entry.getId()) != null) {
- log.debug("item {} processed and removed from the queue", entry.getItem());
- }
- } else {
- log.warn("processing of item {} failed", entry.getId());
+ DistributionQueueEntry entry;
+ while ((entry = queue.getHead()) != null) {
+ if (queueProcessor.process(queue.getName(), entry)) {
+ if (queue.remove(entry.getId()) != null) {
+ log.debug("item {} processed and removed from the queue", entry.getItem());
}
+ } else {
+ log.warn("processing of item {} failed", entry.getId());
}
}
} catch (Exception e) {
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java Wed May 18 08:12:16 2016
@@ -24,10 +24,9 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.WeakHashMap;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.sling.distribution.queue.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -45,6 +44,9 @@ import org.slf4j.LoggerFactory;
* <p/>
* Note that, at the moment, this is a transient in memory queue not persisted on the repository and
* therefore not usable for production.
+ *
+ * Note: potentially the Queue could contain the ordered package ids, with a sidecar map id->item;
+ * that way removal could be faster.
*/
public class SimpleDistributionQueue implements DistributionQueue {
@@ -52,7 +54,7 @@ public class SimpleDistributionQueue imp
private final String name;
- private final BlockingQueue<DistributionQueueItem> queue;
+ private final Queue<DistributionQueueItem> queue;
private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
@@ -72,9 +74,9 @@ public class SimpleDistributionQueue imp
DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
boolean result = false;
try {
- result = queue.offer(item, 10, TimeUnit.SECONDS);
+ result = queue.offer(item);
itemState = DistributionQueueItemState.QUEUED;
- } catch (InterruptedException e) {
+ } catch (Exception e) {
log.error("cannot add an item to the queue", e);
} finally {
statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
@@ -154,4 +156,11 @@ public class SimpleDistributionQueue imp
}
}
+ @Override
+ public String toString() {
+ return "SimpleDistributionQueue{" +
+ "name='" + name + '\'' +
+ '}';
+ }
+
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java Wed May 18 08:12:16 2016
@@ -42,7 +42,7 @@ public class SimpleDistributionQueueProv
private final String name;
private final Scheduler scheduler;
- private final Map<String, DistributionQueue> queueMap = new ConcurrentHashMap<String, DistributionQueue>();
+ private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
public SimpleDistributionQueueProvider(Scheduler scheduler, String name) {
if (name == null || scheduler == null) {
@@ -58,7 +58,7 @@ public class SimpleDistributionQueueProv
public DistributionQueue getQueue(@Nonnull String queueName) {
String key = name + queueName;
- DistributionQueue queue = queueMap.get(key);
+ SimpleDistributionQueue queue = queueMap.get(key);
if (queue == null) {
log.debug("creating a queue with key {}", key);
queue = new SimpleDistributionQueue(name, queueName);
@@ -69,22 +69,31 @@ public class SimpleDistributionQueueProv
}
- Collection<DistributionQueue> getQueues() {
+ Collection<SimpleDistributionQueue> getQueues() {
return queueMap.values();
}
public void enableQueueProcessing(@Nonnull DistributionQueueProcessor queueProcessor, String... queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 10)
- .canRunConcurrently(false)
- .name(getJobName());
- scheduler.schedule(new ScheduledDistributionQueueProcessorTask(this, queueProcessor), options);
+ for (String queueName : queueNames) {
+ ScheduleOptions options = scheduler.NOW(-1, 1)
+ .canRunConcurrently(false)
+ .name(getJobName(queueName));
+ scheduler.schedule(new ScheduledDistributionQueueProcessorTask(getQueue(queueName), queueProcessor), options);
+ }
}
public void disableQueueProcessing() {
- scheduler.unschedule(getJobName());
+ for (DistributionQueue queue : getQueues()) {
+ String queueName = queue.getName();
+ if (scheduler.unschedule(getJobName(queueName))) {
+ log.debug("queue processing on {} stopped", queue);
+ } else {
+ log.warn("could not disable queue processing on {}", queue);
+ }
+ }
}
- private String getJobName() {
- return "simple-queueProcessor-" + name;
+ private String getJobName(String queueName) {
+ return "simple-queueProcessor-" + name + "-" + queueName;
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/ScheduledDistributionQueueProcessorTaskTest.java Wed May 18 08:12:16 2016
@@ -37,10 +37,10 @@ public class ScheduledDistributionQueueP
@Test
public void testRunWithNoQueue() throws Exception {
- SimpleDistributionQueueProvider queueProvider = mock(SimpleDistributionQueueProvider.class);
+ DistributionQueue queue = mock(DistributionQueue.class);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
ScheduledDistributionQueueProcessorTask scheduledDistributionQueueProcessorTask = new ScheduledDistributionQueueProcessorTask(
- queueProvider, queueProcessor);
+ queue, queueProcessor);
scheduledDistributionQueueProcessorTask.run();
}
@@ -54,7 +54,7 @@ public class ScheduledDistributionQueueP
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
ScheduledDistributionQueueProcessorTask scheduledDistributionQueueProcessorTask = new ScheduledDistributionQueueProcessorTask(
- queueProvider, queueProcessor);
+ queue, queueProcessor);
scheduledDistributionQueueProcessorTask.run();
}
@@ -70,7 +70,7 @@ public class ScheduledDistributionQueueP
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
ScheduledDistributionQueueProcessorTask scheduledDistributionQueueProcessorTask = new ScheduledDistributionQueueProcessorTask(
- queueProvider, queueProcessor);
+ queue, queueProcessor);
scheduledDistributionQueueProcessorTask.run();
}
}
\ No newline at end of file
Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java?rev=1744357&r1=1744356&r2=1744357&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java Wed May 18 08:12:16 2016
@@ -41,13 +41,11 @@ public class SimpleDistributionQueueProv
assertNotNull(queue);
}
-
-
@Test
public void testEnableQueueProcessing() throws Exception {
Scheduler scheduler = mock(Scheduler.class);
ScheduleOptions options = mock(ScheduleOptions.class);
- when(scheduler.NOW(-1, 10)).thenReturn(options);
+ when(scheduler.NOW(-1, 1)).thenReturn(options);
when(options.canRunConcurrently(false)).thenReturn(options);
when(options.name(any(String.class))).thenReturn(options);
SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent");