You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2016/05/18 07:11:12 UTC
svn commit: r1744355 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
Author: cziegeler
Date: Wed May 18 07:11:12 2016
New Revision: 1744355
URL: http://svn.apache.org/viewvc?rev=1744355&view=rev
Log:
SLING-5255 : Jobs: allow to configure queue.maxparallel in terms of % of number of CPU cores. Apply patch from Zygmunt Wiercioch
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1744355&r1=1744354&r2=1744355&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Wed May 18 07:11:12 2016
@@ -63,10 +63,13 @@ import org.slf4j.LoggerFactory;
label="Type",
description="The queue type."),
@Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
- intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
+ doubleValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
label="Maximum Parallel Jobs",
description="The maximum number of parallel jobs started for this queue. "
- + "A value of -1 is substituted with the number of available processors."),
+ + "A value of -1 is substituted with the number of available processors. "
+ + "Positive integer values specify number of processors to use. Can be greater than number of processors. "
+ + "A decimal number between 0.0 and 1.0 is treated as a fraction of available processors. "
+ + "For example 0.5 means half of the available processors."),
@Property(name=ConfigurationConstants.PROP_RETRIES,
intValue=ConfigurationConstants.DEFAULT_RETRIES,
label="Maximum Retries",
@@ -193,8 +196,30 @@ public class InternalQueueConfiguration
}
this.retries = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_RETRIES), ConfigurationConstants.DEFAULT_RETRIES);
this.retryDelay = PropertiesUtil.toLong(params.get(ConfigurationConstants.PROP_RETRY_DELAY), ConfigurationConstants.DEFAULT_RETRY_DELAY);
- final int maxParallel = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_MAX_PARALLEL), ConfigurationConstants.DEFAULT_MAX_PARALLEL);
- this.maxParallelProcesses = (maxParallel == -1 ? ConfigurationConstants.NUMBER_OF_PROCESSORS : maxParallel);
+
+ // Float values are treated as percentage. int values are treated as number of cores, -1 == all available
+ // Note: the value is based on the core count at startup. It will not change dynamically if core count changes.
+ int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS;
+ final double inMaxParallel = PropertiesUtil.toDouble(params.get(ConfigurationConstants.PROP_MAX_PARALLEL),
+ ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+ logger.debug("Max parallel for queue {} is {}", this.name, inMaxParallel);
+ if ((inMaxParallel == Math.floor(inMaxParallel)) && !Double.isInfinite(inMaxParallel)) {
+ // integral type
+ if ((int) inMaxParallel == 0) {
+ logger.warn("Max threads property for {} set to zero.", this.name);
+ }
+ this.maxParallelProcesses = (inMaxParallel <= -1 ? cores : (int) inMaxParallel);
+ } else {
+ // percentage (rounded)
+ if ((inMaxParallel > 0.0) && (inMaxParallel < 1.0)) {
+ this.maxParallelProcesses = (int) Math.round(cores * inMaxParallel);
+ } else {
+ logger.warn("Invalid queue max parallel value for queue {}. Using {}", this.name, cores);
+ this.maxParallelProcesses = cores;
+ }
+ }
+ logger.debug("Thread pool size for {} was set to {}", this.name, this.maxParallelProcesses);
+
// ignore parallel setting for ordered queues
if ( this.type == Type.ORDERED ) {
this.maxParallelProcesses = 1;
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1744355&r1=1744354&r2=1744355&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Wed May 18 07:11:12 2016
@@ -31,10 +31,53 @@ public class InternalQueueConfigurationT
@org.junit.Test public void testMaxParallel() {
final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_NAME, "QueueConfigurationTest");
p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -1);
InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+ // Edge cases 0.0 and 1.0 (treated as int numbers)
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.0);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(0, c.getMaxParallel());
+
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 1.0);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(1, c.getMaxParallel());
+
+ // percentage (50%)
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.5);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.5), c.getMaxParallel());
+
+ // rounding
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.90);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.9), c.getMaxParallel());
+
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.99);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.99), c.getMaxParallel());
+
+ // Percentages can't go over 99% (0.99)
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 1.01);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+ // Treat negative values same a -1 (all cores)
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -0.5);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -2);
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+ // Invalid number results in ConfigurationConstants.DEFAULT_MAX_PARALLEL
+ p.put(ConfigurationConstants.PROP_MAX_PARALLEL, "a string");
+ c = InternalQueueConfiguration.fromConfiguration(p);
+ assertEquals(ConfigurationConstants.DEFAULT_MAX_PARALLEL, c.getMaxParallel());
}
@org.junit.Test public void testTopicMatchersDot() {