You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2016/05/18 07:11:12 UTC

svn commit: r1744355 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java

Author: cziegeler
Date: Wed May 18 07:11:12 2016
New Revision: 1744355

URL: http://svn.apache.org/viewvc?rev=1744355&view=rev
Log:
SLING-5255 : Jobs: allow to configure queue.maxparallel in terms of % of number of CPU cores. Apply patch from Zygmunt Wiercioch 

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1744355&r1=1744354&r2=1744355&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Wed May 18 07:11:12 2016
@@ -63,10 +63,13 @@ import org.slf4j.LoggerFactory;
               label="Type",
               description="The queue type."),
     @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
-              intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
+              doubleValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL,
               label="Maximum Parallel Jobs",
               description="The maximum number of parallel jobs started for this queue. "
-                        + "A value of -1 is substituted with the number of available processors."),
+                        + "A value of -1 is substituted with the number of available processors. "
+                        + "Positive integer values specify number of processors to use.  Can be greater than number of processors. "
+                        + "A decimal number between 0.0 and 1.0 is treated as a fraction of available processors. "
+                        + "For example 0.5 means half of the available processors."),
     @Property(name=ConfigurationConstants.PROP_RETRIES,
               intValue=ConfigurationConstants.DEFAULT_RETRIES,
               label="Maximum Retries",
@@ -193,8 +196,30 @@ public class InternalQueueConfiguration
         }
         this.retries = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_RETRIES), ConfigurationConstants.DEFAULT_RETRIES);
         this.retryDelay = PropertiesUtil.toLong(params.get(ConfigurationConstants.PROP_RETRY_DELAY), ConfigurationConstants.DEFAULT_RETRY_DELAY);
-        final int maxParallel = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_MAX_PARALLEL), ConfigurationConstants.DEFAULT_MAX_PARALLEL);
-        this.maxParallelProcesses = (maxParallel == -1 ? ConfigurationConstants.NUMBER_OF_PROCESSORS : maxParallel);
+
+        // Float values are treated as percentage.  int values are treated as number of cores, -1 == all available
+        // Note: the value is based on the core count at startup.  It will not change dynamically if core count changes.
+        int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS;
+        final double inMaxParallel = PropertiesUtil.toDouble(params.get(ConfigurationConstants.PROP_MAX_PARALLEL),
+                ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+        logger.debug("Max parallel for queue {} is {}", this.name, inMaxParallel);
+        if ((inMaxParallel == Math.floor(inMaxParallel)) && !Double.isInfinite(inMaxParallel)) {
+            // integral type
+            if ((int) inMaxParallel == 0) {
+                logger.warn("Max threads property for {} set to zero.", this.name);
+            }
+            this.maxParallelProcesses = (inMaxParallel <= -1 ? cores : (int) inMaxParallel);
+        } else {
+            // percentage (rounded)
+            if ((inMaxParallel > 0.0) && (inMaxParallel < 1.0)) {
+                this.maxParallelProcesses = (int) Math.round(cores * inMaxParallel);
+            } else {
+                logger.warn("Invalid queue max parallel value for queue {}. Using {}", this.name, cores);
+                this.maxParallelProcesses =  cores;
+            }
+        }
+        logger.debug("Thread pool size for {} was set to {}", this.name, this.maxParallelProcesses);
+
         // ignore parallel setting for ordered queues
         if ( this.type == Type.ORDERED ) {
             this.maxParallelProcesses = 1;

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1744355&r1=1744354&r2=1744355&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Wed May 18 07:11:12 2016
@@ -31,10 +31,53 @@ public class InternalQueueConfigurationT
 
     @org.junit.Test public void testMaxParallel() {
         final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_NAME, "QueueConfigurationTest");
         p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -1);
 
         InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
         assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+        // Edge cases 0.0 and 1.0 (treated as int numbers)
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.0);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(0, c.getMaxParallel());
+
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 1.0);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(1, c.getMaxParallel());
+
+        // percentage (50%)
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.5);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.5), c.getMaxParallel());
+
+        // rounding
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.90);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.9), c.getMaxParallel());
+
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 0.99);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.99), c.getMaxParallel());
+
+        // Percentages can't go over 99% (0.99)
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, 1.01);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+        // Treat negative values same a -1 (all cores)
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -0.5);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, -2);
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel());
+
+        // Invalid number results in ConfigurationConstants.DEFAULT_MAX_PARALLEL
+        p.put(ConfigurationConstants.PROP_MAX_PARALLEL, "a string");
+        c = InternalQueueConfiguration.fromConfiguration(p);
+        assertEquals(ConfigurationConstants.DEFAULT_MAX_PARALLEL, c.getMaxParallel());
     }
 
     @org.junit.Test public void testTopicMatchersDot() {