You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2018/01/22 10:27:04 UTC
[sling-org-apache-sling-commons-threads] 01/02: SLING-7407 : set
min pool size to max pool size if queue size is -1 thus unbounded
This is an automated email from the ASF dual-hosted git repository.
stefanegli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git
commit 2c637f96d1c0db99c3bbd1a4bbf56eac7621665a
Author: Stefan Egli <st...@apache.org>
AuthorDate: Fri Jan 19 16:23:58 2018 +0100
SLING-7407 : set min pool size to max pool size if queue size is -1 thus unbounded
---
.../commons/threads/impl/DefaultThreadPool.java | 12 ++++
.../threads/impl/DefaultThreadPoolTest.java | 81 ++++++++++++++++++++++
2 files changed, 93 insertions(+)
diff --git a/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java b/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
index c3c16d8..30938d1 100644
--- a/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
+++ b/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
@@ -111,6 +111,18 @@ public class DefaultThreadPool
if (this.configuration.getQueueSize() > 0) {
queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(this.configuration.getQueueSize());
} else {
+ // SLING-7407 : queue size is -1 (or negative) == unbounded
+ // in this case the max pool size wouldn't have any effect, since the
+ // pool is only increased (ie threads only created) when the queue is blocked
+ // but with an unbounded queue that never happens, thus you'd always get only
+ // maximum min queue size threads.
+ // To fix this somewhat odd behaviour, we now automatically set the min to max for this case:
+ if (this.configuration.getMinPoolSize() < this.configuration.getMaxPoolSize()) {
+ this.logger.warn("min-pool-size (" + configuration.getMinPoolSize() +
+ ") < max-pool-size (" + configuration.getMaxPoolSize() + ") for pool \"" + this.name +
+ "\" which has unbounded queue (queue size -1). Set to " + configuration.getMaxPoolSize());
+ this.configuration.setMinPoolSize(configuration.getMaxPoolSize());
+ }
queue = new LinkedBlockingQueue<Runnable>();
}
} else {
diff --git a/src/test/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolTest.java b/src/test/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolTest.java
new file mode 100644
index 0000000..1d723ab
--- /dev/null
+++ b/src/test/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.commons.threads.impl;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+
+public class DefaultThreadPoolTest {
+
+ // SLING-7407
+ @Test
+ public void unboundedQueueMinSizeCorrection() {
+ final BundleContext bc = Mockito.mock(BundleContext.class);
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Constants.SERVICE_DESCRIPTION, "Apache Sling Thread Pool Manager");
+ props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+ props.put(Constants.SERVICE_PID, DefaultThreadPool.class.getName() + ".factory");
+ DefaultThreadPoolManager dtpm = new DefaultThreadPoolManager(bc, props);
+ ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
+ config.setMinPoolSize(1);
+ config.setMaxPoolSize(5);
+ config.setQueueSize(-1);
+ ThreadPool tp = dtpm.create(config);
+
+ final Semaphore blocker = new Semaphore(0);
+ final Semaphore counter = new Semaphore(0);
+ final Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ counter.release();
+ try {
+ blocker.tryAcquire(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ };
+ tp.execute(r);
+ try {
+ assertTrue(counter.tryAcquire(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ fail("got interrupted");
+ }
+ tp.execute(r);
+ try {
+ assertTrue(counter.tryAcquire(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ fail("got interrupted");
+ }
+ blocker.release(2);
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
stefanegli@apache.org.