You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2017/06/30 09:05:00 UTC

svn commit: r1800364 - in /sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl: AsyncDeliveryDispatchingStrategy.java jobhandling/JobHandlingDistributionQueueProvider.java

Author: tommaso
Date: Fri Jun 30 09:05:00 2017
New Revision: 1800364

URL: http://svn.apache.org/viewvc?rev=1800364&view=rev
Log:
SLING-6988 - async delivery should use an unordered queue

Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java?rev=1800364&r1=1800363&r2=1800364&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java Fri Jun 30 09:05:00 2017
@@ -28,12 +28,7 @@ import org.apache.sling.distribution.pac
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.packaging.impl.ReferencePackage;
 import org.apache.sling.distribution.packaging.impl.SharedDistributionPackage;
-import org.apache.sling.distribution.queue.DistributionQueue;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
-import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +87,7 @@ public class AsyncDeliveryDispatchingStr
                 DistributionPackageUtils.acquire(distributionPackage, deliveryQueueName);
 
                 // add the actual package to the delivery queue
-                DistributionQueue deliveryQueue = queueProvider.getQueue(deliveryQueueName);
+                DistributionQueue deliveryQueue = queueProvider.getQueue(deliveryQueueName, DistributionQueueType.PARALLEL);
                 DistributionQueueEntry deliveryQueueEntry = deliveryQueue.add(item);
                 if (deliveryQueueEntry != null) {
                     DistributionQueueItemStatus status = deliveryQueueEntry.getStatus();

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java?rev=1800364&r1=1800363&r2=1800364&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java Fri Jun 30 09:05:00 2017
@@ -101,12 +101,14 @@ public class JobHandlingDistributionQueu
                         QueueConfiguration.class.getName(), null);
                 Dictionary<String, Object> props = new Hashtable<String, Object>();
                 props.put(ConfigurationConstants.PROP_NAME, queueName);
-                props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.UNORDERED.name());
+                props.put(ConfigurationConstants.PROP_TYPE, DistributionQueueType.PARALLEL.equals(type) ?
+                        QueueConfiguration.Type.UNORDERED.name() : QueueConfiguration.Type.ORDERED.name());
                 props.put(ConfigurationConstants.PROP_TOPICS, new String[]{topic});
                 props.put(ConfigurationConstants.PROP_RETRIES, -1);
                 props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
                 props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
                 props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
+                props.put(ConfigurationConstants.PROP_MAX_PARALLEL, ConfigurationConstants.DEFAULT_MAX_PARALLEL);
                 config.update(props);
             }
         } catch (IOException e) {